xiangfu0 commented on code in PR #18090: URL: https://github.com/apache/pinot/pull/18090#discussion_r3036284576
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/IvfPqVectorIndexCreator.java: ########## @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.vector; + +import com.google.common.base.Preconditions; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig; +import org.apache.pinot.segment.spi.index.creator.VectorIndexCreator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Creates an IVF_PQ (Inverted File with residual Product Quantization) index for immutable segments. + * + * <p>Uses a two-pass design to bound memory usage. During {@link #add}, vectors are spilled to a + * temporary file while a reservoir sample is kept in memory for training. During {@link #seal}, + * centroids and PQ codebooks are trained from the sample, then all vectors are streamed from + * the spill file for assignment and encoding.</p> + * + * <p>Peak heap usage is O(trainSampleSize * dimension) instead of O(numVectors * dimension).</p> + * + * <h3>Thread safety</h3> + * <p>This class is not thread-safe. It is intended for single-threaded offline segment creation.</p> + */ +public class IvfPqVectorIndexCreator implements VectorIndexCreator { + private static final Logger LOGGER = LoggerFactory.getLogger(IvfPqVectorIndexCreator.class); + + /** Magic bytes identifying an IVF_PQ index file: ASCII "IVPQ". */ + public static final int MAGIC = 0x49565051; + + /** Current file format version. */ + public static final int FORMAT_VERSION = 1; + + /** On-disk file extension for the IVF_PQ index. */ + public static final String INDEX_FILE_EXTENSION = V1Constants.Indexes.VECTOR_IVF_PQ_INDEX_FILE_EXTENSION; + + private final String _column; + private final File _indexDir; + private final int _dimension; + private final int _nlist; + private final int _pqM; + private final int _pqNbits; + private final int _trainSampleSize; + private final long _trainingSeed; + private final VectorIndexConfig.VectorDistanceFunction _distanceFunction; + + // Spill file for raw vectors: sequential float arrays, _dimension floats per vector. + private final File _spillFile; + private final DataOutputStream _spillOut; + + // Reservoir sample for training. Kept in memory; bounded by _trainSampleSize. + private final float[][] _trainingSample; + private int _numVectors; + private boolean _sealed; + + /** + * Creates a new IVF_PQ creator. + * + * @param column the column name + * @param indexDir the index directory + * @param config vector index configuration + */ + public IvfPqVectorIndexCreator(String column, File indexDir, VectorIndexConfig config) + throws IOException { + _column = column; + _indexDir = indexDir; + _dimension = config.getVectorDimension(); + _distanceFunction = config.getVectorDistanceFunction(); + + Map<String, String> properties = + Preconditions.checkNotNull(config.getProperties(), "IVF_PQ properties are required"); + _nlist = parseRequiredPositiveInt(properties, "nlist"); + _pqM = parseRequiredPositiveInt(properties, "pqM"); + _pqNbits = parseRequiredPositiveInt(properties, "pqNbits"); + _trainSampleSize = parseRequiredPositiveInt(properties, "trainSampleSize"); + _trainingSeed = parseLong(properties, "trainingSeed", System.nanoTime()); + + Preconditions.checkArgument(_dimension > 0, "Vector dimension must be positive, got: %s", _dimension); + Preconditions.checkArgument(_nlist > 0, "nlist must be positive, got: %s", _nlist); + Preconditions.checkArgument(_pqM > 0, "pqM must be positive, got: %s", _pqM); + Preconditions.checkArgument(_pqM <= _dimension, "pqM must be <= dimension, got pqM=%s dimension=%s", _pqM, + _dimension); + Preconditions.checkArgument(_dimension % _pqM == 0, + "IVF_PQ pqM (%s) must evenly divide vectorDimension (%s)", _pqM, _dimension); + Preconditions.checkArgument(_pqNbits == 4 || _pqNbits == 6 || _pqNbits == 8, + "IVF_PQ pqNbits must be one of [4, 6, 8], got: %s", _pqNbits); + Preconditions.checkArgument(_trainSampleSize > 0, "trainSampleSize must be positive, got: %s", _trainSampleSize); + Preconditions.checkArgument(_trainSampleSize >= _nlist, + "IVF_PQ trainSampleSize (%s) must be >= nlist (%s)", _trainSampleSize, _nlist); + + _trainingSample = new float[_trainSampleSize][]; + _numVectors = 0; + + _spillFile = new File(indexDir, column + INDEX_FILE_EXTENSION + ".spill"); + _spillOut = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(_spillFile), 1 << 16)); + + LOGGER.info("Creating IVF_PQ index for column: {} in dir: {}, dimension={}, nlist={}, pqM={}, pqNbits={}, " + + "distance={}", column, indexDir.getAbsolutePath(), _dimension, _nlist, _pqM, _pqNbits, + _distanceFunction); + } + + @Override + public void add(Object[] values, @Nullable int[] dictIds) { + Preconditions.checkArgument(values.length == _dimension, + "Vector dimension mismatch: expected %s, got %s", _dimension, values.length); + float[] floatValues = new float[_dimension]; + for (int i = 0; i < _dimension; i++) { + floatValues[i] = (Float) values[i]; + } + add(floatValues); + } + + @Override + public void add(float[] document) { + Preconditions.checkState(!_sealed, "Cannot add documents after seal()"); + Preconditions.checkArgument(document.length == _dimension, + "Vector dimension mismatch: expected %s, got %s", _dimension, document.length); + + // Spill to disk + try { + for (int d = 0; d < _dimension; d++) { + _spillOut.writeFloat(document[d]); + } + } catch (IOException e) { + throw new RuntimeException("Failed to spill vector to disk for column: " + _column, e); + } + + // Reservoir sampling: keep up to _trainSampleSize vectors in memory + if (_numVectors < _trainSampleSize) { + _trainingSample[_numVectors] = document.clone(); + } else { + // Replace with decreasing probability to maintain uniform sampling + Random reservoirRng = new Random(_trainingSeed + _numVectors); + int j = reservoirRng.nextInt(_numVectors + 1); + if (j < _trainSampleSize) { + _trainingSample[j] = document.clone(); + } Review Comment: Handled in d08a44fb0e. The creator now keeps a single seeded `_reservoirRng` and reuses it for reservoir sampling instead of allocating a new `Random` per ingested vector, so the sampling stays deterministic without the per-row object churn. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorQuantizationUtils.java: ########## @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.vector; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.Random; +import org.apache.pinot.common.function.scalar.VectorFunctions; +import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig; + + +/** + * Utility methods for offline vector quantization. + * + * <p>Shared by the IVF_PQ creator, reader, and benchmark harness.</p> + */ +public final class VectorQuantizationUtils { + private static final int MAX_KMEANS_ITERATIONS = 50; + private static final float CONVERGENCE_THRESHOLD = 1e-5f; + + private VectorQuantizationUtils() { + } + + public static int[] computeSubvectorLengths(int dimension, int pqM) { + Preconditions.checkArgument(dimension > 0, "dimension must be positive"); + Preconditions.checkArgument(pqM > 0, "pqM must be positive"); + Preconditions.checkArgument(pqM <= dimension, "pqM must be <= dimension"); + + int[] lengths = new int[pqM]; + int base = dimension / pqM; + int remainder = dimension % pqM; + for (int i = 0; i < pqM; i++) { + lengths[i] = base + (i < remainder ? 1 : 0); + } + return lengths; + } + + public static int[] computeSubvectorOffsets(int[] lengths) { + int[] offsets = new int[lengths.length]; + int offset = 0; + for (int i = 0; i < lengths.length; i++) { + offsets[i] = offset; + offset += lengths[i]; + } + return offsets; + } + + public static float[] normalizeCopy(float[] vector) { + float norm = 0.0f; + for (float v : vector) { + norm += v * v; + } + norm = (float) Math.sqrt(norm); + float[] result = new float[vector.length]; + if (norm > 0.0f) { + for (int i = 0; i < vector.length; i++) { + result[i] = vector[i] / norm; + } + } + return result; + } + + public static float[] transformForDistance(float[] vector, + VectorIndexConfig.VectorDistanceFunction distanceFunction) { + if (distanceFunction == VectorIndexConfig.VectorDistanceFunction.COSINE) { + return normalizeCopy(vector); + } + return vector.clone(); + } + + public static float[][] transformAll(float[][] vectors, VectorIndexConfig.VectorDistanceFunction distanceFunction) { + float[][] transformed = new float[vectors.length][]; + for (int i = 0; i < vectors.length; i++) { + transformed[i] = transformForDistance(vectors[i], distanceFunction); + } + return transformed; + } + + public static float computeDistance(float[] a, float[] b, + VectorIndexConfig.VectorDistanceFunction distanceFunction) { + switch (distanceFunction) { + case EUCLIDEAN: + case L2: + return (float) VectorFunctions.euclideanDistance(a, b); + case COSINE: + return (float) VectorFunctions.cosineDistance(a, b, 1.0d); + case INNER_PRODUCT: + case DOT_PRODUCT: + return (float) -VectorFunctions.dotProduct(a, b); + default: + throw new IllegalArgumentException("Unsupported distance function: " + distanceFunction); + } + } + + public static int findNearestCentroid(float[] vector, float[][] centroids, + VectorIndexConfig.VectorDistanceFunction distanceFunction) { + Preconditions.checkArgument(centroids.length > 0, "centroids must not be empty"); + int nearest = 0; + float nearestDistance = Float.MAX_VALUE; + for (int i = 0; i < centroids.length; i++) { + float distance = computeDistance(vector, centroids[i], distanceFunction); + if (distance < nearestDistance) { + nearestDistance = distance; + nearest = i; + } + } + return nearest; + } + + public static int[] assignVectors(float[][] vectors, float[][] centroids, + VectorIndexConfig.VectorDistanceFunction distanceFunction) { + int[] assignments = new int[vectors.length]; + for (int i = 0; i < vectors.length; i++) { + assignments[i] = findNearestCentroid(vectors[i], centroids, distanceFunction); + } + return assignments; + } + + public static float[][] trainKMeans(float[][] samples, int numCentroids, long seed, + VectorIndexConfig.VectorDistanceFunction distanceFunction) { + Preconditions.checkArgument(numCentroids > 0, "numCentroids must be positive"); + Preconditions.checkArgument(samples != null, "samples must not be null"); + Preconditions.checkArgument(samples.length > 0, "samples must not be empty"); + + int dimension = samples[0].length; + float[][] centroids = initializeCentroids(samples, numCentroids, seed, distanceFunction); + float[][] newCentroids = new float[numCentroids][dimension]; + int[] counts = new int[numCentroids]; + + for (int iteration = 0; iteration < MAX_KMEANS_ITERATIONS; iteration++) { + Arrays.fill(counts, 0); + for (float[] row : newCentroids) { + Arrays.fill(row, 0.0f); + } + + for (int i = 0; i < samples.length; i++) { + int centroid = findNearestCentroid(samples[i], centroids, distanceFunction); + counts[centroid]++; + for (int d = 0; d < dimension; d++) { + newCentroids[centroid][d] += samples[i][d]; + } + } + + float maxMovement = 0.0f; + for (int c = 0; c < numCentroids; c++) { + if (counts[c] == 0) { + newCentroids[c] = centroids[c].clone(); + continue; + } + for (int d = 0; d < dimension; d++) { + newCentroids[c][d] /= counts[c]; + } + maxMovement = Math.max(maxMovement, (float) VectorFunctions.euclideanDistance(centroids[c], newCentroids[c])); + } + + centroids = clone2d(newCentroids); + if (maxMovement < CONVERGENCE_THRESHOLD) { + break; + } + } + return centroids; + } + + private static float[][] initializeCentroids(float[][] samples, int numCentroids, long seed, + VectorIndexConfig.VectorDistanceFunction distanceFunction) { + int sampleCount = samples.length; + int dimension = samples[0].length; + float[][] centroids = new float[numCentroids][dimension]; + Random random = new Random(seed); + + if (sampleCount >= numCentroids) { + centroids[0] = samples[random.nextInt(sampleCount)].clone(); + float[] minDistances = new float[sampleCount]; + Arrays.fill(minDistances, Float.MAX_VALUE); + + for (int centroid = 1; centroid < numCentroids; centroid++) { + float totalWeight = 0.0f; + for (int i = 0; i < sampleCount; i++) { + float distance = computeDistance(samples[i], centroids[centroid - 1], distanceFunction); + if (distance < minDistances[i]) { + minDistances[i] = distance; + } + totalWeight += minDistances[i]; + } + + if (totalWeight <= 0.0f) { + int fallback = random.nextInt(sampleCount); + centroids[centroid] = samples[fallback].clone(); + continue; + } Review Comment: Handled in d08a44fb0e. `VectorQuantizationUtils.initializeCentroids()` now uses a dedicated non-negative `computeTrainingDistance()` helper for k-means++ weighting, matching the IVF_FLAT training behavior: cosine normalizes first, and the other backends use Euclidean training distance instead of similarity-derived negative weights. I also added coverage in `VectorQuantizationUtilsTest` for DOT_PRODUCT and COSINE training distance behavior. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java: ########## @@ -178,12 +189,22 @@ public VectorIndexReader createIndexReader(SegmentDirectory.Reader segmentReader File segmentDir = segmentReader.toSegmentDirectory().getPath().toFile(); VectorIndexConfig indexConfig = fieldIndexConfigs.getConfig(StandardIndexes.vector()); VectorBackendType backendType = indexConfig.resolveBackendType(); + File configuredIndexFile = + SegmentDirectoryPaths.findVectorIndexIndexFile(segmentDir, metadata.getColumnName(), indexConfig); + if (configuredIndexFile == null || !configuredIndexFile.exists()) { Review Comment: Handled in d08a44fb0e. `VectorIndexType.ReaderFactory#createIndexReader()` now returns `null` immediately when the configured vector index is disabled, before backend resolution or artifact lookup. I also added `testReaderFactoryReturnsNullWhenVectorIndexIsDisabled()` to lock the behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
