mikemccand commented on code in PR #15979:
URL: https://github.com/apache/lucene/pull/15979#discussion_r3389913556


##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.internal.hppc.IntObjectHashMap;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataAccessHint;
+import org.apache.lucene.store.FileDataHint;
+import org.apache.lucene.store.FileTypeHint;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.FileOpenHint;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LongValues;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.hnsw.RandomVectorScorer;
+import org.apache.lucene.util.packed.DirectReader;
+
+/**
+ * Reads a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) 
followed by per-field DISI
+ * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field 
metadata.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsReader extends FlatVectorsReader {
+
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class);
+
+  private final FlatVectorsScorer vectorScorer;
+  private final FlatVectorsScorer translatingScorer;
+  private final FieldInfos fieldInfos;
+  private final IndexInput vectorData;
+  private final IOContext dataContext;
+
+  private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
+
+  public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer 
scorer)
+      throws IOException {
+    this(state, scorer, DataAccessHint.RANDOM);
+  }
+
+  public DedupFlatVectorsReader(
+      SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint 
accessHint)
+      throws IOException {
+    this.vectorScorer = scorer;
+    this.translatingScorer = new DedupTranslatingScorer(scorer);
+    this.fieldInfos = state.fieldInfos;
+
+    int versionMeta;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    try (ChecksumIndexInput meta = 
state.directory.openChecksumInput(metaFileName)) {
+      Throwable priorE = null;
+      try {
+        versionMeta =
+            CodecUtil.checkIndexHeader(
+                meta,
+                DedupFlatVectorsFormat.META_CODEC_NAME,
+                DedupFlatVectorsFormat.VERSION_START,
+                DedupFlatVectorsFormat.VERSION_CURRENT,
+                state.segmentInfo.getId(),
+                state.segmentSuffix);
+        readMetaBody(meta, state.fieldInfos);
+      } catch (Throwable e) {
+        priorE = e;
+        throw e;
+      } finally {
+        CodecUtil.checkFooter(meta, priorE);
+      }
+    }
+
+    FileOpenHint[] hints =
+        Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint)
+            .filter(Objects::nonNull)

Review Comment:
   Is this because `accessHint` might be null?  Can we just `null` check it up 
front, and throw `NPE` -- or are callers really allowed to pass `null` in 
general?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share
+ * the same physical vector, dramatically reducing on-disk size when many 
docs/fields point at the
+ * same vector data.
+ *
+ * <p>The format groups fields into "pools" keyed by {@code (dimension, 
encoding)}. All fields in
+ * the same pool share unique-vector storage; per-field metadata records how 
each doc-ord maps to a

Review Comment:
   Hmm, is encoding the `String` SPI identifier, or, a specific instance of one?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share
+ * the same physical vector, dramatically reducing on-disk size when many 
docs/fields point at the
+ * same vector data.
+ *
+ * <p>The format groups fields into "pools" keyed by {@code (dimension, 
encoding)}. All fields in
+ * the same pool share unique-vector storage; per-field metadata records how 
each doc-ord maps to a
+ * vector-ord within the pool.
+ *

Review Comment:
   Maybe add paragraph saying how dedup is done?  Does it store every vector in 
heap during indexing before segment is flushed?  Or, Lucene already does that 
today, and we are building hash map over those (not double-heap-storing)?  
Maybe say rough RAM requirements -- is it 64 bit hash, etc.  Also explain how 
merging works -- then we are not caching in RAM, but to merge the pools, is it 
iterating through hash map?  Are vectors sorted, so it's a merge sort at merge 
time?  Good to explain high level approach in these format docs.



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int

Review Comment:
   Let's add good testing for this?  Where merging deletes nearly every doc?  
And, sometimes, it deletes every doc that had a vector (yet some non-vector 
docs are merged).



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.internal.hppc.IntObjectHashMap;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataAccessHint;
+import org.apache.lucene.store.FileDataHint;
+import org.apache.lucene.store.FileTypeHint;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.FileOpenHint;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LongValues;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.hnsw.RandomVectorScorer;
+import org.apache.lucene.util.packed.DirectReader;
+
+/**
+ * Reads a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) 
followed by per-field DISI
+ * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field 
metadata.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsReader extends FlatVectorsReader {
+
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class);
+
+  private final FlatVectorsScorer vectorScorer;
+  private final FlatVectorsScorer translatingScorer;
+  private final FieldInfos fieldInfos;
+  private final IndexInput vectorData;
+  private final IOContext dataContext;
+
+  private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
+
+  public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer 
scorer)
+      throws IOException {
+    this(state, scorer, DataAccessHint.RANDOM);
+  }
+
+  public DedupFlatVectorsReader(
+      SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint 
accessHint)
+      throws IOException {
+    this.vectorScorer = scorer;
+    this.translatingScorer = new DedupTranslatingScorer(scorer);
+    this.fieldInfos = state.fieldInfos;
+
+    int versionMeta;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    try (ChecksumIndexInput meta = 
state.directory.openChecksumInput(metaFileName)) {
+      Throwable priorE = null;
+      try {
+        versionMeta =
+            CodecUtil.checkIndexHeader(
+                meta,
+                DedupFlatVectorsFormat.META_CODEC_NAME,
+                DedupFlatVectorsFormat.VERSION_START,
+                DedupFlatVectorsFormat.VERSION_CURRENT,
+                state.segmentInfo.getId(),
+                state.segmentSuffix);
+        readMetaBody(meta, state.fieldInfos);
+      } catch (Throwable e) {
+        priorE = e;
+        throw e;
+      } finally {
+        CodecUtil.checkFooter(meta, priorE);
+      }
+    }
+
+    FileOpenHint[] hints =
+        Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint)
+            .filter(Objects::nonNull)
+            .toArray(FileOpenHint[]::new);
+    dataContext = state.context.withHints(hints);
+    boolean success = false;
+    IndexInput data = null;
+    try {
+      data = openDataInput(state, versionMeta, dataContext);
+      this.vectorData = data;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(data);
+      }
+    }
+  }
+
+  private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) 
throws IOException {
+    int numPools = meta.readVInt();
+    PoolEntry[] poolsLocal = new PoolEntry[numPools];
+    for (int p = 0; p < numPools; p++) {
+      int dim = meta.readVInt();
+      int encOrd = meta.readByte();
+      VectorEncoding encoding = VectorEncoding.values()[encOrd];
+      long offset = meta.readVLong();
+      long length = meta.readVLong();
+      int uniqueCount = meta.readVInt();
+      poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, 
uniqueCount);
+    }
+    for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = 
meta.readInt()) {
+      FieldInfo info = fieldInfos.fieldInfo(fieldNumber);
+      if (info == null) {
+        throw new CorruptIndexException("Invalid field number: " + 
fieldNumber, meta);
+      }
+      int simOrd = meta.readByte();
+      VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd];
+      int poolId = meta.readVInt();
+      if (poolId < 0 || poolId >= poolsLocal.length) {
+        throw new CorruptIndexException(
+            "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + 
")", meta);
+      }
+      int cardinality = meta.readInt();
+      OrdToDocDISIReaderConfiguration ordToDoc =
+          OrdToDocDISIReaderConfiguration.fromStoredMeta(meta, cardinality);
+      long mapOffset = meta.readVLong();
+      long mapLength = meta.readVLong();
+      int bitsPerVecOrd = meta.readByte();
+      FieldEntry entry =
+          new FieldEntry(
+              info,
+              sim,
+              poolId,
+              cardinality,
+              mapOffset,
+              mapLength,
+              bitsPerVecOrd,
+              ordToDoc,
+              poolsLocal[poolId]);
+      fields.put(info.number, entry);
+    }
+  }
+
+  private static IndexInput openDataInput(
+      SegmentReadState state, int versionMeta, IOContext context) throws 
IOException {
+    String fileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+    IndexInput in = state.directory.openInput(fileName, context);
+    boolean success = false;
+    try {
+      int versionVectorData =
+          CodecUtil.checkIndexHeader(
+              in,
+              DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+              DedupFlatVectorsFormat.VERSION_START,
+              DedupFlatVectorsFormat.VERSION_CURRENT,
+              state.segmentInfo.getId(),
+              state.segmentSuffix);
+      if (versionMeta != versionVectorData) {
+        throw new CorruptIndexException(
+            "Format versions mismatch: meta="
+                + versionMeta
+                + ", "
+                + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME
+                + "="
+                + versionVectorData,
+            in);
+      }
+      CodecUtil.retrieveChecksum(in);
+      success = true;
+      return in;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(in);
+      }
+    }
+  }
+
+  private FieldEntry getFieldEntry(String fieldName, VectorEncoding expected) {
+    FieldInfo info = fieldInfos.fieldInfo(fieldName);
+    if (info == null) {
+      throw new IllegalArgumentException("field=\"" + fieldName + "\" not 
found");
+    }
+    FieldEntry entry = fields.get(info.number);
+    if (entry == null) {
+      throw new IllegalArgumentException(
+          "field=\"" + fieldName + "\" not encoded by DedupFlatVectorsFormat");
+    }
+    if (entry.pool.encoding != expected) {
+      throw new IllegalArgumentException(
+          "field=\""
+              + fieldName
+              + "\" encoded as "
+              + entry.pool.encoding
+              + " but expected "
+              + expected);
+    }
+    return entry;
+  }
+
+  @Override
+  public FloatVectorValues getFloatVectorValues(String field) throws 
IOException {
+    FieldEntry entry = getFieldEntry(field, VectorEncoding.FLOAT32);
+    return OffHeapDedupFloatVectorValues.create(entry, vectorData, 
vectorScorer);
+  }
+
+  @Override
+  public ByteVectorValues getByteVectorValues(String field) throws IOException 
{
+    FieldEntry entry = getFieldEntry(field, VectorEncoding.BYTE);
+    return OffHeapDedupByteVectorValues.create(entry, vectorData, 
vectorScorer);
+  }
+
+  @Override
+  public FlatVectorsScorer getFlatVectorScorer(String field) throws 
IOException {
+    // Used by Lucene99HnswVectorsWriter at merge time: it asks for a scorer 
over our flat
+    // values, then builds a RandomVectorScorerSupplier from it. The 
translating wrapper
+    // detects our OffHeapDedup* values, extracts the pool view, builds an 
off-heap SIMD
+    // supplier over the pool, and translates docOrd → vecOrd on every scoring 
call.
+    return translatingScorer;
+  }
+
+  @Override
+  public RandomVectorScorer getRandomVectorScorer(String field, float[] 
target) throws IOException {
+    FieldEntry entry = getFieldEntry(field, VectorEncoding.FLOAT32);
+    if (entry.cardinality == 0) {
+      return null;
+    }
+    OffHeapDedupFloatVectorValues fieldView =
+        OffHeapDedupFloatVectorValues.create(entry, vectorData, vectorScorer);
+    KnnVectorValues poolView = fieldView.poolView();
+    RandomVectorScorer poolScorer =
+        vectorScorer.getRandomVectorScorer(entry.similarity, poolView, target);
+    int[] docOrdToVecOrd = entry.getOrLoadDocOrdToVecOrd(vectorData);
+    return DedupTranslatingScorer.wrapScorer(poolScorer, fieldView, 
docOrdToVecOrd);
+  }
+
+  @Override
+  public RandomVectorScorer getRandomVectorScorer(String field, byte[] target) 
throws IOException {
+    FieldEntry entry = getFieldEntry(field, VectorEncoding.BYTE);
+    if (entry.cardinality == 0) {
+      return null;
+    }
+    OffHeapDedupByteVectorValues fieldView =
+        OffHeapDedupByteVectorValues.create(entry, vectorData, vectorScorer);
+    KnnVectorValues poolView = fieldView.poolView();
+    RandomVectorScorer poolScorer =
+        vectorScorer.getRandomVectorScorer(entry.similarity, poolView, target);
+    int[] docOrdToVecOrd = entry.getOrLoadDocOrdToVecOrd(vectorData);
+    return DedupTranslatingScorer.wrapScorer(poolScorer, fieldView, 
docOrdToVecOrd);
+  }
+
+  @Override
+  public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {
+    FieldEntry entry = fields.get(fieldInfo.number);
+    if (entry == null) {
+      return Map.of();
+    }
+    return Map.of(
+        DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION, entry.pool.length + 
entry.mapLength);
+  }
+
+  @Override
+  public void checkIntegrity() throws IOException {
+    CodecUtil.checksumEntireFile(vectorData);
+  }
+
+  @Override
+  public FlatVectorsReader getMergeInstance() throws IOException {
+    
vectorData.updateIOContext(dataContext.withHints(DataAccessHint.SEQUENTIAL));
+    return this;
+  }
+
+  @Override
+  public void finishMerge() throws IOException {
+    vectorData.updateIOContext(dataContext);
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.close(vectorData);
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return SHALLOW_SIZE + fields.ramBytesUsed();
+  }
+
+  /** Test-only: look up a field entry by name. Package-private. */
+  FieldEntry getFieldEntryForTesting(String fieldName) {
+    FieldInfo info = fieldInfos.fieldInfo(fieldName);
+    if (info == null) {
+      throw new IllegalArgumentException("no such field " + fieldName);
+    }
+    return fields.get(info.number);
+  }
+
+  /** Per-pool runtime state. */
+  static final class PoolEntry {
+    final int poolId;
+    final int dim;
+    final VectorEncoding encoding;
+    final long offset;
+    final long length;
+    final int uniqueCount;
+    final int byteSize;
+
+    PoolEntry(
+        int poolId, int dim, VectorEncoding encoding, long offset, long 
length, int uniqueCount) {
+      this.poolId = poolId;
+      this.dim = dim;
+      this.encoding = encoding;
+      this.offset = offset;
+      this.length = length;
+      this.uniqueCount = uniqueCount;
+      this.byteSize = dim * encoding.byteSize;
+    }
+  }
+
+  /** Per-field runtime state. */
+  static final class FieldEntry {
+    final FieldInfo info;
+    final VectorSimilarityFunction similarity;
+    final int poolId;
+    final int cardinality;
+    final long mapOffset;
+    final long mapLength;
+    final int bitsPerVecOrd;
+    final OrdToDocDISIReaderConfiguration ordToDoc;
+    final PoolEntry pool;
+
+    /**
+     * Lazily-materialised dense {@code docOrd → vecOrd} array. Loaded on 
first scorer build so the
+     * hot HNSW build/search loop translates with a single primitive int[] 
read instead of
+     * dispatching through {@link DirectReader}'s {@link LongValues}. Memory 
footprint: 4 ×
+     * cardinality bytes per field (e.g. ~4 MB for 1M docs).
+     */
+    private volatile int[] docOrdToVecOrdArray;
+
+    FieldEntry(
+        FieldInfo info,
+        VectorSimilarityFunction similarity,
+        int poolId,
+        int cardinality,
+        long mapOffset,
+        long mapLength,
+        int bitsPerVecOrd,
+        OrdToDocDISIReaderConfiguration ordToDoc,
+        PoolEntry pool) {
+      this.info = info;
+      this.similarity = similarity;
+      this.poolId = poolId;
+      this.cardinality = cardinality;
+      this.mapOffset = mapOffset;
+      this.mapLength = mapLength;
+      this.bitsPerVecOrd = bitsPerVecOrd;
+      this.ordToDoc = ordToDoc;
+      this.pool = pool;
+    }
+
+    int[] getOrLoadDocOrdToVecOrd(IndexInput vectorData) throws IOException {
+      int[] cached = docOrdToVecOrdArray;
+      if (cached != null) {
+        return cached;
+      }
+      synchronized (this) {
+        cached = docOrdToVecOrdArray;
+        if (cached != null) {
+          return cached;
+        }
+        cached = loadDocOrdToVecOrd(vectorData);
+        docOrdToVecOrdArray = cached;
+        return cached;
+      }
+    }
+
+    private int[] loadDocOrdToVecOrd(IndexInput vectorData) throws IOException 
{
+      int[] arr = new int[cardinality];

Review Comment:
   Oh, `cardinality` is maybe actually `maxDoc`?  Or is it number of docs that 
have a vector in this segment?  `cardinality` is tricky.  It's a synonym for 
"number", or "count", ...



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share
+ * the same physical vector, dramatically reducing on-disk size when many 
docs/fields point at the
+ * same vector data.
+ *
+ * <p>The format groups fields into "pools" keyed by {@code (dimension, 
encoding)}. All fields in
+ * the same pool share unique-vector storage; per-field metadata records how 
each doc-ord maps to a

Review Comment:
   Oh!  This is `VectorEncoding` -- `byte[]` vs `float[]` input vectors, right? 
 Cool, so this works for both!



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.internal.hppc.IntObjectHashMap;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataAccessHint;
+import org.apache.lucene.store.FileDataHint;
+import org.apache.lucene.store.FileTypeHint;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.FileOpenHint;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LongValues;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.hnsw.RandomVectorScorer;
+import org.apache.lucene.util.packed.DirectReader;
+
+/**
+ * Reads a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) 
followed by per-field DISI
+ * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field 
metadata.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsReader extends FlatVectorsReader {
+
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class);
+
+  private final FlatVectorsScorer vectorScorer;
+  private final FlatVectorsScorer translatingScorer;
+  private final FieldInfos fieldInfos;
+  private final IndexInput vectorData;
+  private final IOContext dataContext;
+
+  private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
+
+  public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer 
scorer)
+      throws IOException {
+    this(state, scorer, DataAccessHint.RANDOM);

Review Comment:
   Does this format implement prefetch?  Maybe add `// TODO` and/or open 
followon once we merge?  Since this format could "amplify" the randomness (the 
deref to original ordinal), prefetch is maybe even more important than the 
non-dup-detecting formats.



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.internal.hppc.IntObjectHashMap;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataAccessHint;
+import org.apache.lucene.store.FileDataHint;
+import org.apache.lucene.store.FileTypeHint;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.FileOpenHint;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LongValues;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.hnsw.RandomVectorScorer;
+import org.apache.lucene.util.packed.DirectReader;
+
+/**
+ * Reads a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) 
followed by per-field DISI
+ * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field 
metadata.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsReader extends FlatVectorsReader {
+
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class);
+
+  private final FlatVectorsScorer vectorScorer;
+  private final FlatVectorsScorer translatingScorer;
+  private final FieldInfos fieldInfos;
+  private final IndexInput vectorData;
+  private final IOContext dataContext;
+
+  private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
+
+  public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer 
scorer)
+      throws IOException {
+    this(state, scorer, DataAccessHint.RANDOM);
+  }
+
+  public DedupFlatVectorsReader(
+      SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint 
accessHint)
+      throws IOException {
+    this.vectorScorer = scorer;
+    this.translatingScorer = new DedupTranslatingScorer(scorer);
+    this.fieldInfos = state.fieldInfos;
+
+    int versionMeta;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    try (ChecksumIndexInput meta = 
state.directory.openChecksumInput(metaFileName)) {
+      Throwable priorE = null;
+      try {
+        versionMeta =
+            CodecUtil.checkIndexHeader(
+                meta,
+                DedupFlatVectorsFormat.META_CODEC_NAME,
+                DedupFlatVectorsFormat.VERSION_START,
+                DedupFlatVectorsFormat.VERSION_CURRENT,
+                state.segmentInfo.getId(),
+                state.segmentSuffix);
+        readMetaBody(meta, state.fieldInfos);
+      } catch (Throwable e) {
+        priorE = e;
+        throw e;
+      } finally {
+        CodecUtil.checkFooter(meta, priorE);
+      }
+    }
+
+    FileOpenHint[] hints =
+        Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint)
+            .filter(Objects::nonNull)
+            .toArray(FileOpenHint[]::new);
+    dataContext = state.context.withHints(hints);
+    boolean success = false;
+    IndexInput data = null;
+    try {
+      data = openDataInput(state, versionMeta, dataContext);
+      this.vectorData = data;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(data);
+      }
+    }
+  }
+
+  private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) 
throws IOException {
+    int numPools = meta.readVInt();
+    PoolEntry[] poolsLocal = new PoolEntry[numPools];
+    for (int p = 0; p < numPools; p++) {
+      int dim = meta.readVInt();
+      int encOrd = meta.readByte();
+      VectorEncoding encoding = VectorEncoding.values()[encOrd];
+      long offset = meta.readVLong();
+      long length = meta.readVLong();
+      int uniqueCount = meta.readVInt();
+      poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, 
uniqueCount);
+    }
+    for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = 
meta.readInt()) {
+      FieldInfo info = fieldInfos.fieldInfo(fieldNumber);
+      if (info == null) {
+        throw new CorruptIndexException("Invalid field number: " + 
fieldNumber, meta);
+      }
+      int simOrd = meta.readByte();
+      VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd];
+      int poolId = meta.readVInt();
+      if (poolId < 0 || poolId >= poolsLocal.length) {
+        throw new CorruptIndexException(
+            "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + 
")", meta);
+      }
+      int cardinality = meta.readInt();

Review Comment:
   Is this number of unique vectors?  Maybe rename to `numVectors`?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.internal.hppc.IntObjectHashMap;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataAccessHint;
+import org.apache.lucene.store.FileDataHint;
+import org.apache.lucene.store.FileTypeHint;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.FileOpenHint;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LongValues;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.hnsw.RandomVectorScorer;
+import org.apache.lucene.util.packed.DirectReader;
+
+/**
+ * Reads a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) 
followed by per-field DISI
+ * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field 
metadata.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsReader extends FlatVectorsReader {
+
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class);
+
+  private final FlatVectorsScorer vectorScorer;
+  private final FlatVectorsScorer translatingScorer;
+  private final FieldInfos fieldInfos;
+  private final IndexInput vectorData;
+  private final IOContext dataContext;
+
+  private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
+
+  public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer 
scorer)
+      throws IOException {
+    this(state, scorer, DataAccessHint.RANDOM);
+  }
+
+  public DedupFlatVectorsReader(
+      SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint 
accessHint)
+      throws IOException {
+    this.vectorScorer = scorer;
+    this.translatingScorer = new DedupTranslatingScorer(scorer);
+    this.fieldInfos = state.fieldInfos;
+
+    int versionMeta;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    try (ChecksumIndexInput meta = 
state.directory.openChecksumInput(metaFileName)) {
+      Throwable priorE = null;
+      try {
+        versionMeta =
+            CodecUtil.checkIndexHeader(
+                meta,
+                DedupFlatVectorsFormat.META_CODEC_NAME,
+                DedupFlatVectorsFormat.VERSION_START,
+                DedupFlatVectorsFormat.VERSION_CURRENT,
+                state.segmentInfo.getId(),
+                state.segmentSuffix);
+        readMetaBody(meta, state.fieldInfos);
+      } catch (Throwable e) {
+        priorE = e;
+        throw e;
+      } finally {
+        CodecUtil.checkFooter(meta, priorE);
+      }
+    }
+
+    FileOpenHint[] hints =
+        Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint)
+            .filter(Objects::nonNull)
+            .toArray(FileOpenHint[]::new);
+    dataContext = state.context.withHints(hints);
+    boolean success = false;
+    IndexInput data = null;
+    try {
+      data = openDataInput(state, versionMeta, dataContext);
+      this.vectorData = data;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(data);
+      }
+    }
+  }
+
+  private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) 
throws IOException {
+    int numPools = meta.readVInt();
+    PoolEntry[] poolsLocal = new PoolEntry[numPools];
+    for (int p = 0; p < numPools; p++) {
+      int dim = meta.readVInt();
+      int encOrd = meta.readByte();
+      VectorEncoding encoding = VectorEncoding.values()[encOrd];
+      long offset = meta.readVLong();
+      long length = meta.readVLong();
+      int uniqueCount = meta.readVInt();
+      poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, 
uniqueCount);
+    }
+    for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = 
meta.readInt()) {
+      FieldInfo info = fieldInfos.fieldInfo(fieldNumber);
+      if (info == null) {
+        throw new CorruptIndexException("Invalid field number: " + 
fieldNumber, meta);
+      }
+      int simOrd = meta.readByte();
+      VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd];
+      int poolId = meta.readVInt();
+      if (poolId < 0 || poolId >= poolsLocal.length) {
+        throw new CorruptIndexException(
+            "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + 
")", meta);
+      }
+      int cardinality = meta.readInt();
+      OrdToDocDISIReaderConfiguration ordToDoc =
+          OrdToDocDISIReaderConfiguration.fromStoredMeta(meta, cardinality);
+      long mapOffset = meta.readVLong();
+      long mapLength = meta.readVLong();
+      int bitsPerVecOrd = meta.readByte();
+      FieldEntry entry =
+          new FieldEntry(
+              info,
+              sim,
+              poolId,
+              cardinality,
+              mapOffset,
+              mapLength,
+              bitsPerVecOrd,
+              ordToDoc,
+              poolsLocal[poolId]);
+      fields.put(info.number, entry);
+    }
+  }
+
+  private static IndexInput openDataInput(
+      SegmentReadState state, int versionMeta, IOContext context) throws 
IOException {
+    String fileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+    IndexInput in = state.directory.openInput(fileName, context);
+    boolean success = false;
+    try {
+      int versionVectorData =
+          CodecUtil.checkIndexHeader(
+              in,
+              DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+              DedupFlatVectorsFormat.VERSION_START,
+              DedupFlatVectorsFormat.VERSION_CURRENT,
+              state.segmentInfo.getId(),
+              state.segmentSuffix);
+      if (versionMeta != versionVectorData) {
+        throw new CorruptIndexException(
+            "Format versions mismatch: meta="
+                + versionMeta
+                + ", "
+                + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME
+                + "="
+                + versionVectorData,
+            in);
+      }
+      CodecUtil.retrieveChecksum(in);
+      success = true;
+      return in;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(in);
+      }
+    }
+  }
+
+  private FieldEntry getFieldEntry(String fieldName, VectorEncoding expected) {
+    FieldInfo info = fieldInfos.fieldInfo(fieldName);
+    if (info == null) {
+      throw new IllegalArgumentException("field=\"" + fieldName + "\" not 
found");
+    }
+    FieldEntry entry = fields.get(info.number);
+    if (entry == null) {
+      throw new IllegalArgumentException(
+          "field=\"" + fieldName + "\" not encoded by DedupFlatVectorsFormat");
+    }
+    if (entry.pool.encoding != expected) {
+      throw new IllegalArgumentException(
+          "field=\""
+              + fieldName
+              + "\" encoded as "

Review Comment:
   `indexed from byte/float vector`?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.internal.hppc.IntObjectHashMap;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataAccessHint;
+import org.apache.lucene.store.FileDataHint;
+import org.apache.lucene.store.FileTypeHint;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.FileOpenHint;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LongValues;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.hnsw.RandomVectorScorer;
+import org.apache.lucene.util.packed.DirectReader;
+
+/**
+ * Reads a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) 
followed by per-field DISI
+ * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field 
metadata.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsReader extends FlatVectorsReader {
+
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class);
+
+  private final FlatVectorsScorer vectorScorer;
+  private final FlatVectorsScorer translatingScorer;
+  private final FieldInfos fieldInfos;
+  private final IndexInput vectorData;
+  private final IOContext dataContext;
+
+  private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
+
+  public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer 
scorer)
+      throws IOException {
+    this(state, scorer, DataAccessHint.RANDOM);
+  }
+
+  public DedupFlatVectorsReader(
+      SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint 
accessHint)
+      throws IOException {
+    this.vectorScorer = scorer;
+    this.translatingScorer = new DedupTranslatingScorer(scorer);
+    this.fieldInfos = state.fieldInfos;
+
+    int versionMeta;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    try (ChecksumIndexInput meta = 
state.directory.openChecksumInput(metaFileName)) {
+      Throwable priorE = null;
+      try {
+        versionMeta =
+            CodecUtil.checkIndexHeader(
+                meta,
+                DedupFlatVectorsFormat.META_CODEC_NAME,
+                DedupFlatVectorsFormat.VERSION_START,
+                DedupFlatVectorsFormat.VERSION_CURRENT,
+                state.segmentInfo.getId(),
+                state.segmentSuffix);
+        readMetaBody(meta, state.fieldInfos);
+      } catch (Throwable e) {
+        priorE = e;
+        throw e;
+      } finally {
+        CodecUtil.checkFooter(meta, priorE);
+      }
+    }
+
+    FileOpenHint[] hints =
+        Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint)
+            .filter(Objects::nonNull)
+            .toArray(FileOpenHint[]::new);
+    dataContext = state.context.withHints(hints);
+    boolean success = false;
+    IndexInput data = null;
+    try {
+      data = openDataInput(state, versionMeta, dataContext);
+      this.vectorData = data;
+      success = true;
+    } finally {
+      if (!success) {

Review Comment:
   `== false` instead?  Reduces chance of future refactoring bugs by humans at 
least ...



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsReader.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.internal.hppc.IntObjectHashMap;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataAccessHint;
+import org.apache.lucene.store.FileDataHint;
+import org.apache.lucene.store.FileTypeHint;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.FileOpenHint;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LongValues;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.hnsw.RandomVectorScorer;
+import org.apache.lucene.util.packed.DirectReader;
+
+/**
+ * Reads a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p>Layout: {@code .dvc} holds pool vector bytes (contiguous per pool) 
followed by per-field DISI
+ * + packed {@code docOrd → vecOrd} maps. {@code .dvm} holds pool/field 
metadata.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsReader extends FlatVectorsReader {
+
+  private static final long SHALLOW_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsReader.class);
+
+  private final FlatVectorsScorer vectorScorer;
+  private final FlatVectorsScorer translatingScorer;
+  private final FieldInfos fieldInfos;
+  private final IndexInput vectorData;
+  private final IOContext dataContext;
+
+  private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
+
+  public DedupFlatVectorsReader(SegmentReadState state, FlatVectorsScorer 
scorer)
+      throws IOException {
+    this(state, scorer, DataAccessHint.RANDOM);
+  }
+
+  public DedupFlatVectorsReader(
+      SegmentReadState state, FlatVectorsScorer scorer, DataAccessHint 
accessHint)
+      throws IOException {
+    this.vectorScorer = scorer;
+    this.translatingScorer = new DedupTranslatingScorer(scorer);
+    this.fieldInfos = state.fieldInfos;
+
+    int versionMeta;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    try (ChecksumIndexInput meta = 
state.directory.openChecksumInput(metaFileName)) {
+      Throwable priorE = null;
+      try {
+        versionMeta =
+            CodecUtil.checkIndexHeader(
+                meta,
+                DedupFlatVectorsFormat.META_CODEC_NAME,
+                DedupFlatVectorsFormat.VERSION_START,
+                DedupFlatVectorsFormat.VERSION_CURRENT,
+                state.segmentInfo.getId(),
+                state.segmentSuffix);
+        readMetaBody(meta, state.fieldInfos);
+      } catch (Throwable e) {
+        priorE = e;
+        throw e;
+      } finally {
+        CodecUtil.checkFooter(meta, priorE);
+      }
+    }
+
+    FileOpenHint[] hints =
+        Stream.of(FileTypeHint.DATA, FileDataHint.KNN_VECTORS, accessHint)
+            .filter(Objects::nonNull)
+            .toArray(FileOpenHint[]::new);
+    dataContext = state.context.withHints(hints);
+    boolean success = false;
+    IndexInput data = null;
+    try {
+      data = openDataInput(state, versionMeta, dataContext);
+      this.vectorData = data;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(data);
+      }
+    }
+  }
+
+  private void readMetaBody(ChecksumIndexInput meta, FieldInfos fieldInfos) 
throws IOException {
+    int numPools = meta.readVInt();
+    PoolEntry[] poolsLocal = new PoolEntry[numPools];
+    for (int p = 0; p < numPools; p++) {
+      int dim = meta.readVInt();
+      int encOrd = meta.readByte();
+      VectorEncoding encoding = VectorEncoding.values()[encOrd];
+      long offset = meta.readVLong();
+      long length = meta.readVLong();
+      int uniqueCount = meta.readVInt();
+      poolsLocal[p] = new PoolEntry(p, dim, encoding, offset, length, 
uniqueCount);
+    }
+    for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = 
meta.readInt()) {
+      FieldInfo info = fieldInfos.fieldInfo(fieldNumber);
+      if (info == null) {
+        throw new CorruptIndexException("Invalid field number: " + 
fieldNumber, meta);
+      }
+      int simOrd = meta.readByte();
+      VectorSimilarityFunction sim = VectorSimilarityFunction.values()[simOrd];
+      int poolId = meta.readVInt();
+      if (poolId < 0 || poolId >= poolsLocal.length) {
+        throw new CorruptIndexException(
+            "Invalid poolId " + poolId + " (numPools=" + poolsLocal.length + 
")", meta);
+      }
+      int cardinality = meta.readInt();
+      OrdToDocDISIReaderConfiguration ordToDoc =
+          OrdToDocDISIReaderConfiguration.fromStoredMeta(meta, cardinality);
+      long mapOffset = meta.readVLong();
+      long mapLength = meta.readVLong();
+      int bitsPerVecOrd = meta.readByte();
+      FieldEntry entry =
+          new FieldEntry(
+              info,
+              sim,
+              poolId,
+              cardinality,
+              mapOffset,
+              mapLength,
+              bitsPerVecOrd,
+              ordToDoc,
+              poolsLocal[poolId]);
+      fields.put(info.number, entry);
+    }
+  }
+
+  private static IndexInput openDataInput(
+      SegmentReadState state, int versionMeta, IOContext context) throws 
IOException {
+    String fileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+    IndexInput in = state.directory.openInput(fileName, context);
+    boolean success = false;
+    try {
+      int versionVectorData =
+          CodecUtil.checkIndexHeader(
+              in,
+              DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+              DedupFlatVectorsFormat.VERSION_START,
+              DedupFlatVectorsFormat.VERSION_CURRENT,
+              state.segmentInfo.getId(),
+              state.segmentSuffix);
+      if (versionMeta != versionVectorData) {
+        throw new CorruptIndexException(
+            "Format versions mismatch: meta="
+                + versionMeta
+                + ", "
+                + DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME
+                + "="
+                + versionVectorData,
+            in);
+      }
+      CodecUtil.retrieveChecksum(in);
+      success = true;
+      return in;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(in);
+      }
+    }
+  }
+
+  private FieldEntry getFieldEntry(String fieldName, VectorEncoding expected) {
+    FieldInfo info = fieldInfos.fieldInfo(fieldName);
+    if (info == null) {
+      throw new IllegalArgumentException("field=\"" + fieldName + "\" not 
found");
+    }
+    FieldEntry entry = fields.get(info.number);
+    if (entry == null) {
+      throw new IllegalArgumentException(
+          "field=\"" + fieldName + "\" not encoded by DedupFlatVectorsFormat");

Review Comment:
   Maybe `was not indexed by DedupFlatVectorsFormat in segment X`?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are

Review Comment:
   `source uniques` -> `any source vector whose doc(s) are all deleted is never 
copied into the merged pool`?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsFormat.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
+import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
+import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene90.IndexedDISI;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Flat vector format that stores each unique vector exactly once per segment. 
Multiple documents
+ * (within the same field, and across fields with matching {@code (dimension, 
encoding)}) may share
+ * the same physical vector, dramatically reducing on-disk size when many 
docs/fields point at the
+ * same vector data.
+ *
+ * <p>The format groups fields into "pools" keyed by {@code (dimension, 
encoding)}. All fields in

Review Comment:
   How about `organizes fields into groups with the same dimension and input 
type (byte/float)`?
   
   Edit: actually I think `pools` is OK here?  It's used throughout ... and it 
grows on you (well, me at least) as I see it situ.  It reminds me of what ZFS 
calls "pools" too.



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();

Review Comment:
   Hmm we also allocate these in the flush case?  They just remain empty?  
Maybe we need to / should factor out into a dedicated merging utility class?  
Not sure.



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);
+      field.finish();
+    }
+  }
+
+  private void assignPoolIds(Iterable<? extends Pool> pools) {
+    int n = 0;
+    for (Pool pool : pools) {
+      pool.poolId = n++;
+    }
+  }
+
+  private void writePoolsMeta(
+      Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] 
poolLengths)
+      throws IOException {
+    int numPools = poolOffsets.length;
+    meta.writeVInt(numPools);
+    int p = 0;
+    for (Pool pool : orderedPools) {
+      meta.writeVInt(pool.key.dim);
+      meta.writeByte((byte) pool.key.encoding.ordinal());
+      meta.writeVLong(poolOffsets[p]);
+      meta.writeVLong(poolLengths[p]);
+      meta.writeVInt(pool.uniqueCount());
+      p++;
+    }
+  }
+
+  private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, 
Sorter.DocMap sortMap)
+      throws IOException {
+    int cardinality = field.docsWithField.cardinality();
+    DocsWithFieldSet docsToWrite = field.docsWithField;
+    int[] mapValues = new int[cardinality];
+    if (sortMap == null || cardinality == 0) {
+      for (int i = 0; i < cardinality; i++) {
+        mapValues[i] = field.docOrdToVecOrd.get(i);
+      }
+    } else {
+      int[] old2New = new int[cardinality];
+      int[] new2Old = new int[cardinality];
+      DocsWithFieldSet sortedSet = new DocsWithFieldSet();
+      KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, 
old2New, new2Old, sortedSet);
+      for (int newOrd = 0; newOrd < cardinality; newOrd++) {
+        mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]);
+      }
+      docsToWrite = sortedSet;
+    }
+    writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, 
mapValues);
+  }
+
+  /**
+   * Emit one field's DISI + packed map to {@code .dvc} and one field meta 
record to {@code .dvm}.
+   * Meta-record format (matching read order in {@link 
DedupFlatVectorsReader.FieldEntry}):
+   *
+   * <pre>
+   *   [int]   fieldNumber
+   *   [byte]  similarityOrdinal
+   *   [vint]  poolId
+   *   [int]   cardinality
+   *   ... OrdToDocDISIReaderConfiguration meta (writes inline) ...
+   *   [vlong] mapOffset
+   *   [vlong] mapLength
+   *   [byte]  bitsPerVecOrd
+   * </pre>
+   */
+  private void writeFieldRecord(
+      FieldInfo fieldInfo,
+      Pool pool,
+      int maxDoc,
+      DocsWithFieldSet docsWithField,
+      int[] docOrdToVecOrd)
+      throws IOException {
+    int cardinality = docsWithField.cardinality();
+    meta.writeInt(fieldInfo.number);
+    meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal());
+    meta.writeVInt(pool.poolId);
+    meta.writeInt(cardinality);
+    OrdToDocDISIReaderConfiguration.writeStoredMeta(
+        DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, 
docsWithField);
+    long mapOffset = vectorData.getFilePointer();
+    int bitsPerVecOrd;
+    if (cardinality == 0) {
+      bitsPerVecOrd = 1;
+    } else {
+      int maxVecOrd = Math.max(0, pool.uniqueCount() - 1);

Review Comment:
   `pool.vectorCount()`?



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and

Review Comment:
   I think its optimizing the common case (merging N segments, all of which use 
this same format)?
   
   In which case I would call it "Step 1" instead of "Level A", is to merge the 
pools, i.e. `sourceVecOrd -> mergedVecOrd`.  Then "Step 2" is nicely just 
looking up the `int -> int` remap for the vector ordinals for each doc.  
Similar to how docid is rewritten during postings merge to map around 
deletions.  Very clean/fast!  And the laziness is clever to: we don't copy over 
a vector from the pool if it doesn't appear in the merged segment (none of the 
remaining live docs have it).



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);
+      field.finish();
+    }
+  }
+
+  private void assignPoolIds(Iterable<? extends Pool> pools) {
+    int n = 0;
+    for (Pool pool : pools) {
+      pool.poolId = n++;
+    }
+  }
+
+  private void writePoolsMeta(
+      Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] 
poolLengths)
+      throws IOException {
+    int numPools = poolOffsets.length;
+    meta.writeVInt(numPools);
+    int p = 0;
+    for (Pool pool : orderedPools) {
+      meta.writeVInt(pool.key.dim);
+      meta.writeByte((byte) pool.key.encoding.ordinal());
+      meta.writeVLong(poolOffsets[p]);
+      meta.writeVLong(poolLengths[p]);
+      meta.writeVInt(pool.uniqueCount());
+      p++;
+    }
+  }
+
+  private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, 
Sorter.DocMap sortMap)
+      throws IOException {
+    int cardinality = field.docsWithField.cardinality();
+    DocsWithFieldSet docsToWrite = field.docsWithField;
+    int[] mapValues = new int[cardinality];
+    if (sortMap == null || cardinality == 0) {
+      for (int i = 0; i < cardinality; i++) {
+        mapValues[i] = field.docOrdToVecOrd.get(i);
+      }
+    } else {
+      int[] old2New = new int[cardinality];
+      int[] new2Old = new int[cardinality];
+      DocsWithFieldSet sortedSet = new DocsWithFieldSet();
+      KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, 
old2New, new2Old, sortedSet);
+      for (int newOrd = 0; newOrd < cardinality; newOrd++) {
+        mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]);
+      }
+      docsToWrite = sortedSet;
+    }
+    writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, 
mapValues);
+  }
+
+  /**
+   * Emit one field's DISI + packed map to {@code .dvc} and one field meta 
record to {@code .dvm}.
+   * Meta-record format (matching read order in {@link 
DedupFlatVectorsReader.FieldEntry}):
+   *
+   * <pre>
+   *   [int]   fieldNumber
+   *   [byte]  similarityOrdinal
+   *   [vint]  poolId
+   *   [int]   cardinality
+   *   ... OrdToDocDISIReaderConfiguration meta (writes inline) ...
+   *   [vlong] mapOffset
+   *   [vlong] mapLength
+   *   [byte]  bitsPerVecOrd
+   * </pre>
+   */
+  private void writeFieldRecord(
+      FieldInfo fieldInfo,
+      Pool pool,
+      int maxDoc,
+      DocsWithFieldSet docsWithField,
+      int[] docOrdToVecOrd)
+      throws IOException {
+    int cardinality = docsWithField.cardinality();
+    meta.writeInt(fieldInfo.number);
+    meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal());
+    meta.writeVInt(pool.poolId);
+    meta.writeInt(cardinality);
+    OrdToDocDISIReaderConfiguration.writeStoredMeta(
+        DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, 
docsWithField);
+    long mapOffset = vectorData.getFilePointer();
+    int bitsPerVecOrd;
+    if (cardinality == 0) {
+      bitsPerVecOrd = 1;
+    } else {
+      int maxVecOrd = Math.max(0, pool.uniqueCount() - 1);
+      bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd));
+      DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, 
bitsPerVecOrd);
+      for (int v : docOrdToVecOrd) {

Review Comment:
   I'm confused why we need two layers here (`docOrd`)?  Some docs might not 
have a vector, so first layer (today, before this change) is `docid` -> 
`vectorOrd`, right?  But with this change, it seems like it should still be 
that?  Or is `docOrd` maybe a synonym for `docid`?  I'm confused :) 



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);
+      field.finish();
+    }
+  }
+
+  private void assignPoolIds(Iterable<? extends Pool> pools) {
+    int n = 0;
+    for (Pool pool : pools) {
+      pool.poolId = n++;
+    }
+  }
+
+  private void writePoolsMeta(
+      Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] 
poolLengths)
+      throws IOException {
+    int numPools = poolOffsets.length;
+    meta.writeVInt(numPools);
+    int p = 0;
+    for (Pool pool : orderedPools) {
+      meta.writeVInt(pool.key.dim);
+      meta.writeByte((byte) pool.key.encoding.ordinal());
+      meta.writeVLong(poolOffsets[p]);
+      meta.writeVLong(poolLengths[p]);
+      meta.writeVInt(pool.uniqueCount());
+      p++;
+    }
+  }
+
+  private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, 
Sorter.DocMap sortMap)
+      throws IOException {
+    int cardinality = field.docsWithField.cardinality();
+    DocsWithFieldSet docsToWrite = field.docsWithField;
+    int[] mapValues = new int[cardinality];
+    if (sortMap == null || cardinality == 0) {
+      for (int i = 0; i < cardinality; i++) {
+        mapValues[i] = field.docOrdToVecOrd.get(i);
+      }
+    } else {
+      int[] old2New = new int[cardinality];
+      int[] new2Old = new int[cardinality];
+      DocsWithFieldSet sortedSet = new DocsWithFieldSet();
+      KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, 
old2New, new2Old, sortedSet);
+      for (int newOrd = 0; newOrd < cardinality; newOrd++) {
+        mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]);
+      }
+      docsToWrite = sortedSet;
+    }
+    writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, 
mapValues);
+  }
+
+  /**
+   * Emit one field's DISI + packed map to {@code .dvc} and one field meta 
record to {@code .dvm}.
+   * Meta-record format (matching read order in {@link 
DedupFlatVectorsReader.FieldEntry}):
+   *
+   * <pre>
+   *   [int]   fieldNumber
+   *   [byte]  similarityOrdinal
+   *   [vint]  poolId
+   *   [int]   cardinality
+   *   ... OrdToDocDISIReaderConfiguration meta (writes inline) ...
+   *   [vlong] mapOffset
+   *   [vlong] mapLength
+   *   [byte]  bitsPerVecOrd
+   * </pre>
+   */
+  private void writeFieldRecord(
+      FieldInfo fieldInfo,
+      Pool pool,
+      int maxDoc,
+      DocsWithFieldSet docsWithField,
+      int[] docOrdToVecOrd)
+      throws IOException {
+    int cardinality = docsWithField.cardinality();
+    meta.writeInt(fieldInfo.number);
+    meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal());
+    meta.writeVInt(pool.poolId);
+    meta.writeInt(cardinality);
+    OrdToDocDISIReaderConfiguration.writeStoredMeta(
+        DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, 
docsWithField);
+    long mapOffset = vectorData.getFilePointer();
+    int bitsPerVecOrd;
+    if (cardinality == 0) {
+      bitsPerVecOrd = 1;
+    } else {
+      int maxVecOrd = Math.max(0, pool.uniqueCount() - 1);
+      bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd));
+      DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, 
bitsPerVecOrd);
+      for (int v : docOrdToVecOrd) {
+        writer.add(v);
+      }
+      writer.finish();
+    }
+    long mapLength = vectorData.getFilePointer() - mapOffset;
+    meta.writeVLong(mapOffset);
+    meta.writeVLong(mapLength);
+    meta.writeByte((byte) bitsPerVecOrd);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Merge path
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public void mergeOneFlatVectorField(FieldInfo fieldInfo, MergeState 
mergeState)
+      throws IOException {
+    if (!flushPools.isEmpty()) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    usedForMerge = true;
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    MergePool pool = mergePools.computeIfAbsent(key, MergePool::new);
+    MergeFieldState fieldState = new MergeFieldState(fieldInfo, pool);
+    mergeFields.add(fieldState);
+
+    switch (fieldInfo.getVectorEncoding()) {
+      case BYTE -> mergeByteField(fieldInfo, mergeState, fieldState);
+      case FLOAT32 -> mergeFloatField(fieldInfo, mergeState, fieldState);
+    }
+  }
+
+  private void mergeFloatField(
+      FieldInfo fieldInfo, MergeState mergeState, MergeFieldState fieldState) 
throws IOException {
+    List<FloatSub> subs = new ArrayList<>();
+    for (int i = 0; i < mergeState.knnVectorsReaders.length; i++) {
+      if (mergeState.knnVectorsReaders[i] == null
+          || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name) == null
+          || 
mergeState.fieldInfos[i].fieldInfo(fieldInfo.name).hasVectorValues() == false) {
+        continue;
+      }
+      FloatVectorValues vals = 
mergeState.knnVectorsReaders[i].getFloatVectorValues(fieldInfo.name);
+      if (vals == null) continue;
+      subs.add(new FloatSub(mergeState.docMaps[i], vals, i));
+    }
+    DocIDMerger<FloatSub> merger = DocIDMerger.of(subs, 
mergeState.needsIndexSort);
+    int dim = fieldInfo.getVectorDimension();
+    byte[] candidateBytes = new byte[dim * Float.BYTES];
+    ByteBuffer bb = 
ByteBuffer.wrap(candidateBytes).order(ByteOrder.LITTLE_ENDIAN);
+    // Level-A fast path setup: when a source reader is itself a dedup format 
reader, attach the
+    // source's docOrd→vecOrd table and a lazily-filled srcVecOrd→mergedVecOrd 
map (shared per
+    // (mergedPool, sourceReader)). The doc loop below interns each source 
vec-ord on first
+    // encounter only — uniques whose docs are all deleted are never interned 
(correctness),
+    // and uniques shared by many surviving docs are interned exactly once 
(perf win).

Review Comment:
   Actually I think this should be considered a correctness issue too, i.e., 
all dups should be detected and shared?  We should never see two different 
`vecOrd` pointing to what are actually identical vectors (false negative)?  
Hopefully we can `assert` to that effect, and let's e.g. fix tests to exercise 
this: intentionally create many dup vectors across segments, and then confirm 
on force-merge we have exactly the correct number.



##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);
+      field.finish();
+    }
+  }
+
+  private void assignPoolIds(Iterable<? extends Pool> pools) {
+    int n = 0;
+    for (Pool pool : pools) {
+      pool.poolId = n++;
+    }
+  }
+
+  private void writePoolsMeta(
+      Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] 
poolLengths)
+      throws IOException {
+    int numPools = poolOffsets.length;
+    meta.writeVInt(numPools);
+    int p = 0;
+    for (Pool pool : orderedPools) {
+      meta.writeVInt(pool.key.dim);
+      meta.writeByte((byte) pool.key.encoding.ordinal());
+      meta.writeVLong(poolOffsets[p]);
+      meta.writeVLong(poolLengths[p]);
+      meta.writeVInt(pool.uniqueCount());
+      p++;
+    }
+  }
+
+  private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, 
Sorter.DocMap sortMap)
+      throws IOException {
+    int cardinality = field.docsWithField.cardinality();
+    DocsWithFieldSet docsToWrite = field.docsWithField;
+    int[] mapValues = new int[cardinality];
+    if (sortMap == null || cardinality == 0) {
+      for (int i = 0; i < cardinality; i++) {
+        mapValues[i] = field.docOrdToVecOrd.get(i);
+      }
+    } else {
+      int[] old2New = new int[cardinality];
+      int[] new2Old = new int[cardinality];
+      DocsWithFieldSet sortedSet = new DocsWithFieldSet();
+      KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, 
old2New, new2Old, sortedSet);
+      for (int newOrd = 0; newOrd < cardinality; newOrd++) {
+        mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]);
+      }
+      docsToWrite = sortedSet;
+    }
+    writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, 
mapValues);
+  }
+
+  /**
+   * Emit one field's DISI + packed map to {@code .dvc} and one field meta 
record to {@code .dvm}.
+   * Meta-record format (matching read order in {@link 
DedupFlatVectorsReader.FieldEntry}):
+   *
+   * <pre>
+   *   [int]   fieldNumber
+   *   [byte]  similarityOrdinal
+   *   [vint]  poolId
+   *   [int]   cardinality
+   *   ... OrdToDocDISIReaderConfiguration meta (writes inline) ...
+   *   [vlong] mapOffset
+   *   [vlong] mapLength
+   *   [byte]  bitsPerVecOrd
+   * </pre>
+   */
+  private void writeFieldRecord(
+      FieldInfo fieldInfo,
+      Pool pool,
+      int maxDoc,
+      DocsWithFieldSet docsWithField,
+      int[] docOrdToVecOrd)
+      throws IOException {
+    int cardinality = docsWithField.cardinality();
+    meta.writeInt(fieldInfo.number);
+    meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal());
+    meta.writeVInt(pool.poolId);
+    meta.writeInt(cardinality);
+    OrdToDocDISIReaderConfiguration.writeStoredMeta(
+        DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, 
docsWithField);
+    long mapOffset = vectorData.getFilePointer();
+    int bitsPerVecOrd;
+    if (cardinality == 0) {
+      bitsPerVecOrd = 1;
+    } else {
+      int maxVecOrd = Math.max(0, pool.uniqueCount() - 1);
+      bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd));
+      DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, 
bitsPerVecOrd);
+      for (int v : docOrdToVecOrd) {
+        writer.add(v);
+      }
+      writer.finish();
+    }
+    long mapLength = vectorData.getFilePointer() - mapOffset;
+    meta.writeVLong(mapOffset);
+    meta.writeVLong(mapLength);
+    meta.writeByte((byte) bitsPerVecOrd);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Merge path
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public void mergeOneFlatVectorField(FieldInfo fieldInfo, MergeState 
mergeState)
+      throws IOException {
+    if (!flushPools.isEmpty()) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    usedForMerge = true;
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    MergePool pool = mergePools.computeIfAbsent(key, MergePool::new);
+    MergeFieldState fieldState = new MergeFieldState(fieldInfo, pool);
+    mergeFields.add(fieldState);
+
+    switch (fieldInfo.getVectorEncoding()) {
+      case BYTE -> mergeByteField(fieldInfo, mergeState, fieldState);
+      case FLOAT32 -> mergeFloatField(fieldInfo, mergeState, fieldState);
+    }
+  }
+
+  private void mergeFloatField(
+      FieldInfo fieldInfo, MergeState mergeState, MergeFieldState fieldState) 
throws IOException {
+    List<FloatSub> subs = new ArrayList<>();
+    for (int i = 0; i < mergeState.knnVectorsReaders.length; i++) {
+      if (mergeState.knnVectorsReaders[i] == null
+          || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name) == null
+          || 
mergeState.fieldInfos[i].fieldInfo(fieldInfo.name).hasVectorValues() == false) {
+        continue;
+      }
+      FloatVectorValues vals = 
mergeState.knnVectorsReaders[i].getFloatVectorValues(fieldInfo.name);
+      if (vals == null) continue;
+      subs.add(new FloatSub(mergeState.docMaps[i], vals, i));
+    }
+    DocIDMerger<FloatSub> merger = DocIDMerger.of(subs, 
mergeState.needsIndexSort);
+    int dim = fieldInfo.getVectorDimension();
+    byte[] candidateBytes = new byte[dim * Float.BYTES];
+    ByteBuffer bb = 
ByteBuffer.wrap(candidateBytes).order(ByteOrder.LITTLE_ENDIAN);
+    // Level-A fast path setup: when a source reader is itself a dedup format 
reader, attach the
+    // source's docOrd→vecOrd table and a lazily-filled srcVecOrd→mergedVecOrd 
map (shared per
+    // (mergedPool, sourceReader)). The doc loop below interns each source 
vec-ord on first
+    // encounter only — uniques whose docs are all deleted are never interned 
(correctness),
+    // and uniques shared by many surviving docs are interned exactly once 
(perf win).

Review Comment:
   And, mix in some deletes too!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to