kaivalnp commented on code in PR #15979:
URL: https://github.com/apache/lucene/pull/15979#discussion_r3398786102


##########
lucene/core/src/java/org/apache/lucene/codecs/dedup/DedupFlatVectorsWriter.java:
##########
@@ -0,0 +1,1024 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.codecs.dedup;
+
+import static 
org.apache.lucene.codecs.dedup.DedupFlatVectorsFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.KnnVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatFieldVectorsWriter;
+import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
+import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
+import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
+import org.apache.lucene.index.ByteVectorValues;
+import org.apache.lucene.index.DocIDMerger;
+import org.apache.lucene.index.DocsWithFieldSet;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FloatVectorValues;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.KnnVectorValues;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.Sorter;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.packed.DirectWriter;
+
+/**
+ * Writes a {@link DedupFlatVectorsFormat} segment.
+ *
+ * <p><b>Indexing (flush)</b>: per-field calls accumulate vectors in heap, 
with a per-pool dedup
+ * hash table ({@link FlushPool}). {@code flush()} writes contiguous 
unique-vector bytes per pool,
+ * then per-field {@code docOrd → vecOrd} maps.
+ *
+ * <p><b>Merge</b>: per-field calls iterate source segments via {@link 
DocIDMerger}. A per-pool
+ * {@link MergePool} hash table maps the 64-bit hash of each candidate vector 
to a {@code
+ * (KnnVectorValues, srcOrd)} pair so that hash-collisions can be 
byte-verified by re-reading the
+ * source via mmap. No temp files are written; pool data is materialised into 
{@code .dvc} at {@link
+ * #finish()} time by re-reading each unique vector from its source segment 
(sources stay open until
+ * merge ends).
+ *
+ * <p>When a source reader is itself a {@link DedupFlatVectorsReader}, the 
merge takes a fast path
+ * (Level A): each source vec-ord is interned <em>once</em> per (mergedPool, 
sourceReader) pair and
+ * cached in a sparse {@code srcVecOrd → mergedVecOrd} array, so the per-doc 
loop becomes two int
+ * reads with no hash + no byte-verify. Lazy materialisation ensures source 
uniques whose docs are
+ * all deleted are never copied into the merged pool.
+ *
+ * @lucene.experimental
+ */
+public final class DedupFlatVectorsWriter extends FlatVectorsWriter {
+
+  private static final long SHALLOW_RAM_BYTES_USED =
+      RamUsageEstimator.shallowSizeOfInstance(DedupFlatVectorsWriter.class);
+
+  private final SegmentWriteState segmentWriteState;
+  private final IndexOutput meta;
+  private final IndexOutput vectorData;
+
+  // Pools used during flush — keyed by (dim, encoding); insertion-order 
preserved as pool id.
+  private final Map<PoolKey, FlushPool> flushPools = new LinkedHashMap<>();
+  private final List<FlushFieldWriter<?>> flushFields = new ArrayList<>();
+
+  // Pools used during merge.
+  private final Map<PoolKey, MergePool> mergePools = new LinkedHashMap<>();
+  private final List<MergeFieldState> mergeFields = new ArrayList<>();
+
+  private boolean finished;
+  private boolean usedForMerge;
+
+  DedupFlatVectorsWriter(SegmentWriteState state, FlatVectorsScorer scorer) 
throws IOException {
+    super(scorer);
+    this.segmentWriteState = state;
+    String metaFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name, state.segmentSuffix, 
DedupFlatVectorsFormat.META_EXTENSION);
+    String vectorDataFileName =
+        IndexFileNames.segmentFileName(
+            state.segmentInfo.name,
+            state.segmentSuffix,
+            DedupFlatVectorsFormat.VECTOR_DATA_EXTENSION);
+
+    boolean success = false;
+    IndexOutput m = null, v = null;
+    try {
+      m = state.directory.createOutput(metaFileName, state.context);
+      v = state.directory.createOutput(vectorDataFileName, state.context);
+      CodecUtil.writeIndexHeader(
+          m,
+          DedupFlatVectorsFormat.META_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      CodecUtil.writeIndexHeader(
+          v,
+          DedupFlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
+          DedupFlatVectorsFormat.VERSION_CURRENT,
+          state.segmentInfo.getId(),
+          state.segmentSuffix);
+      this.meta = m;
+      this.vectorData = v;
+      success = true;
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(m, v);
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Indexing path (flush)
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public FlatFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws 
IOException {
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    FlushPool pool = flushPools.computeIfAbsent(key, FlushPool::new);
+    FlushFieldWriter<?> w =
+        switch (fieldInfo.getVectorEncoding()) {
+          case BYTE -> new FlushFieldWriter.ByteImpl(fieldInfo, pool);
+          case FLOAT32 -> new FlushFieldWriter.FloatImpl(fieldInfo, pool);
+        };
+    flushFields.add(w);
+    return w;
+  }
+
+  @Override
+  public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
+    if (usedForMerge) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    // 1. Write each pool's contiguous unique-vector bytes to .dvc.
+    long[] poolOffsets = new long[flushPools.size()];
+    long[] poolLengths = new long[flushPools.size()];
+    assignPoolIds(flushPools.values());
+    int p = 0;
+    for (FlushPool pool : flushPools.values()) {
+      long start = alignOutput(vectorData, pool.key.encoding);
+      pool.writeBytes(vectorData);
+      poolOffsets[p] = start;
+      poolLengths[p] = vectorData.getFilePointer() - start;
+      p++;
+    }
+    // 2. Pool metadata.
+    writePoolsMeta(flushPools.values(), poolOffsets, poolLengths);
+    // 3. Per-field DISI + map + meta record.
+    for (FlushFieldWriter<?> field : flushFields) {
+      writeFlushField(field, maxDoc, sortMap);
+      field.finish();
+    }
+  }
+
+  private void assignPoolIds(Iterable<? extends Pool> pools) {
+    int n = 0;
+    for (Pool pool : pools) {
+      pool.poolId = n++;
+    }
+  }
+
+  private void writePoolsMeta(
+      Iterable<? extends Pool> orderedPools, long[] poolOffsets, long[] 
poolLengths)
+      throws IOException {
+    int numPools = poolOffsets.length;
+    meta.writeVInt(numPools);
+    int p = 0;
+    for (Pool pool : orderedPools) {
+      meta.writeVInt(pool.key.dim);
+      meta.writeByte((byte) pool.key.encoding.ordinal());
+      meta.writeVLong(poolOffsets[p]);
+      meta.writeVLong(poolLengths[p]);
+      meta.writeVInt(pool.uniqueCount());
+      p++;
+    }
+  }
+
+  private void writeFlushField(FlushFieldWriter<?> field, int maxDoc, 
Sorter.DocMap sortMap)
+      throws IOException {
+    int cardinality = field.docsWithField.cardinality();
+    DocsWithFieldSet docsToWrite = field.docsWithField;
+    int[] mapValues = new int[cardinality];
+    if (sortMap == null || cardinality == 0) {
+      for (int i = 0; i < cardinality; i++) {
+        mapValues[i] = field.docOrdToVecOrd.get(i);
+      }
+    } else {
+      int[] old2New = new int[cardinality];
+      int[] new2Old = new int[cardinality];
+      DocsWithFieldSet sortedSet = new DocsWithFieldSet();
+      KnnVectorsWriter.mapOldOrdToNewOrd(field.docsWithField, sortMap, 
old2New, new2Old, sortedSet);
+      for (int newOrd = 0; newOrd < cardinality; newOrd++) {
+        mapValues[newOrd] = field.docOrdToVecOrd.get(new2Old[newOrd]);
+      }
+      docsToWrite = sortedSet;
+    }
+    writeFieldRecord(field.fieldInfo, field.pool, maxDoc, docsToWrite, 
mapValues);
+  }
+
+  /**
+   * Emit one field's DISI + packed map to {@code .dvc} and one field meta 
record to {@code .dvm}.
+   * Meta-record format (matching read order in {@link 
DedupFlatVectorsReader.FieldEntry}):
+   *
+   * <pre>
+   *   [int]   fieldNumber
+   *   [byte]  similarityOrdinal
+   *   [vint]  poolId
+   *   [int]   cardinality
+   *   ... OrdToDocDISIReaderConfiguration meta (writes inline) ...
+   *   [vlong] mapOffset
+   *   [vlong] mapLength
+   *   [byte]  bitsPerVecOrd
+   * </pre>
+   */
+  private void writeFieldRecord(
+      FieldInfo fieldInfo,
+      Pool pool,
+      int maxDoc,
+      DocsWithFieldSet docsWithField,
+      int[] docOrdToVecOrd)
+      throws IOException {
+    int cardinality = docsWithField.cardinality();
+    meta.writeInt(fieldInfo.number);
+    meta.writeByte((byte) fieldInfo.getVectorSimilarityFunction().ordinal());
+    meta.writeVInt(pool.poolId);
+    meta.writeInt(cardinality);
+    OrdToDocDISIReaderConfiguration.writeStoredMeta(
+        DIRECT_MONOTONIC_BLOCK_SHIFT, meta, vectorData, cardinality, maxDoc, 
docsWithField);
+    long mapOffset = vectorData.getFilePointer();
+    int bitsPerVecOrd;
+    if (cardinality == 0) {
+      bitsPerVecOrd = 1;
+    } else {
+      int maxVecOrd = Math.max(0, pool.uniqueCount() - 1);
+      bitsPerVecOrd = DirectWriter.bitsRequired(Math.max(1, maxVecOrd));
+      DirectWriter writer = DirectWriter.getInstance(vectorData, cardinality, 
bitsPerVecOrd);
+      for (int v : docOrdToVecOrd) {
+        writer.add(v);
+      }
+      writer.finish();
+    }
+    long mapLength = vectorData.getFilePointer() - mapOffset;
+    meta.writeVLong(mapOffset);
+    meta.writeVLong(mapLength);
+    meta.writeByte((byte) bitsPerVecOrd);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Merge path
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  public void mergeOneFlatVectorField(FieldInfo fieldInfo, MergeState 
mergeState)
+      throws IOException {
+    if (!flushPools.isEmpty()) {
+      throw new IllegalStateException(
+          "DedupFlatVectorsWriter cannot be used for both flush and merge in 
the same instance");
+    }
+    usedForMerge = true;
+    PoolKey key = new PoolKey(fieldInfo.getVectorDimension(), 
fieldInfo.getVectorEncoding());
+    MergePool pool = mergePools.computeIfAbsent(key, MergePool::new);
+    MergeFieldState fieldState = new MergeFieldState(fieldInfo, pool);
+    mergeFields.add(fieldState);
+
+    switch (fieldInfo.getVectorEncoding()) {
+      case BYTE -> mergeByteField(fieldInfo, mergeState, fieldState);
+      case FLOAT32 -> mergeFloatField(fieldInfo, mergeState, fieldState);
+    }
+  }
+
+  private void mergeFloatField(
+      FieldInfo fieldInfo, MergeState mergeState, MergeFieldState fieldState) 
throws IOException {
+    List<FloatSub> subs = new ArrayList<>();
+    for (int i = 0; i < mergeState.knnVectorsReaders.length; i++) {
+      if (mergeState.knnVectorsReaders[i] == null
+          || mergeState.fieldInfos[i].fieldInfo(fieldInfo.name) == null
+          || 
mergeState.fieldInfos[i].fieldInfo(fieldInfo.name).hasVectorValues() == false) {
+        continue;
+      }
+      FloatVectorValues vals = 
mergeState.knnVectorsReaders[i].getFloatVectorValues(fieldInfo.name);
+      if (vals == null) continue;
+      subs.add(new FloatSub(mergeState.docMaps[i], vals, i));
+    }
+    DocIDMerger<FloatSub> merger = DocIDMerger.of(subs, 
mergeState.needsIndexSort);
+    int dim = fieldInfo.getVectorDimension();
+    byte[] candidateBytes = new byte[dim * Float.BYTES];
+    ByteBuffer bb = 
ByteBuffer.wrap(candidateBytes).order(ByteOrder.LITTLE_ENDIAN);
+    // Level-A fast path setup: when a source reader is itself a dedup format 
reader, attach the
+    // source's docOrd→vecOrd table and a lazily-filled srcVecOrd→mergedVecOrd 
map (shared per
+    // (mergedPool, sourceReader)). The doc loop below interns each source 
vec-ord on first
+    // encounter only — uniques whose docs are all deleted are never interned 
(correctness),

Review Comment:
   This is an artifact of iterating on the AI's original solution, which 
eagerly merged "pools" across segments, even if some vector was no longer 
"referenced" by a document -- leading to ever-growing indices.
   
   I asked it to merge lazily, only copying bytes that are referenced -- and 
some of those inner workings leaked into the comments.
   
   I'll update to avoid this^



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to