Copilot commented on code in PR #6995: URL: https://github.com/apache/paimon/pull/6995#discussion_r2688766080
########## paimon-python/pypaimon/globalindex/global_index_scan_builder.py: ########## @@ -0,0 +1,204 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Builder for scanning global indexes.""" + +from abc import ABC, abstractmethod +from typing import List, Optional, Collection +from concurrent.futures import ThreadPoolExecutor, as_completed + +from pypaimon.globalindex import GlobalIndexIOMeta, GlobalIndexReader, GlobalIndexEvaluator +from pypaimon.globalindex.range import Range +from pypaimon.globalindex.global_index_result import GlobalIndexResult + + +class GlobalIndexScanBuilder(ABC): + """Builder for scanning global indexes.""" + + @abstractmethod + def with_snapshot(self, snapshot_or_id) -> 'GlobalIndexScanBuilder': + """Set the snapshot to scan.""" + pass + + @abstractmethod + def with_partition_predicate(self, partition_predicate) -> 'GlobalIndexScanBuilder': + """Set the partition predicate.""" + pass + + @abstractmethod + def with_row_range(self, row_range: Range) -> 'GlobalIndexScanBuilder': + """Set the row range to scan.""" + pass + + @abstractmethod + def build(self) -> 'RowRangeGlobalIndexScanner': + """Build the scanner.""" + pass + + @abstractmethod + def shard_list(self) -> List[Range]: + """Return sorted and non-overlapping ranges.""" + pass + + @staticmethod + def parallel_scan( + ranges: List[Range], + builder: 'GlobalIndexScanBuilder', + filter_predicate: Optional['Predicate'], + vector_search: Optional['VectorSearch'], + thread_num: Optional[int] = None + ) -> Optional[GlobalIndexResult]: + + if not ranges: + return None + + scanners = [] + try: + # Build scanners for each range + for row_range in ranges: + scanner = builder.with_row_range(row_range).build() + scanners.append((row_range, scanner)) + + # Execute scans in parallel + results: List[Optional[GlobalIndexResult]] = [None] * len(ranges) + + def scan_range(idx: int, scanner: 'RowRangeGlobalIndexScanner') -> tuple: + result = scanner.scan(filter_predicate, vector_search) + return idx, result + + with ThreadPoolExecutor(max_workers=thread_num) as executor: Review Comment: The `thread_num` parameter can be None, which allows ThreadPoolExecutor to use its default behavior (typically 5x the number of CPU cores). While this is acceptable, consider documenting this behavior or setting a reasonable maximum to prevent resource exhaustion when many ranges are scanned in parallel. ########## paimon-python/pypaimon/globalindex/range.py: ########## @@ -0,0 +1,189 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Range utilities for global index.""" + +from dataclasses import dataclass +from typing import List + + +@dataclass +class Range: + """Represents a range [from_, to] inclusive.""" + + from_: int + to: int + + def __post_init__(self): + if self.from_ > self.to: + raise ValueError(f"Invalid range: from ({self.from_}) > to ({self.to})") Review Comment: The error message uses parameter names 'from' and 'to' but the actual field names are 'from_' and 'to'. Consider updating the message to use 'from_' and 'to' or using more descriptive terms like 'start' and 'end' to avoid confusion with the field names. ```suggestion raise ValueError(f"Invalid range: start ({self.from_}) > end ({self.to})") ``` ########## paimon-python/pypaimon/globalindex/faiss/faiss_vector_reader.py: ########## @@ -0,0 +1,298 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +FAISS Vector Global Index Reader. +""" + +import os +import tempfile +import uuid +from typing import Dict, List, Optional + +import numpy as np + +from pypaimon.globalindex.global_index_reader import GlobalIndexReader +from pypaimon.globalindex.global_index_result import GlobalIndexResult +from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta +from pypaimon.globalindex.vector_search_result import DictBasedVectorSearchResult +from pypaimon.globalindex.roaring_bitmap import RoaringBitmap64 +from pypaimon.globalindex.faiss.faiss_options import ( + FaissVectorIndexOptions, + FaissVectorMetric, + FaissIndexType, +) +from pypaimon.globalindex.faiss.faiss_index_meta import FaissIndexMeta +from pypaimon.globalindex.faiss.faiss_index import FaissIndex + + +class FaissVectorGlobalIndexReader(GlobalIndexReader): + """ + Vector global index reader using FAISS. + """ + + def __init__( + self, + file_io: 'FileIO', + index_path: str, + io_metas: List[GlobalIndexIOMeta], + options: FaissVectorIndexOptions + ): + self._file_io = file_io + self._index_path = index_path + self._io_metas = io_metas + self._options = options + + self._indices: List[Optional[FaissIndex]] = [] + self._index_metas: List[FaissIndexMeta] = [] + self._local_index_files: List[str] = [] + + self._metas_loaded = False + self._indices_loaded = False + + def visit_vector_search(self, vector_search: 'VectorSearch') -> Optional[GlobalIndexResult]: + """Perform vector similarity search.""" + try: + # First load only metadata + self._ensure_load_metas() + + include_row_ids = vector_search.include_row_ids + + # If include_row_ids is specified, check which indices contain matching rows + if include_row_ids is not None: + matching_indices = [] + for i, meta in enumerate(self._index_metas): + if self._has_overlap(meta.min_id, meta.max_id, include_row_ids): + matching_indices.append(i) + + # If no index contains matching rowIds, return empty + if not matching_indices: + return None + + # Load only matching indices + self._ensure_load_indices(matching_indices) + else: + # Load all indices + self._ensure_load_all_indices() + + return self._search(vector_search) + + except Exception as e: + raise RuntimeError( + f"Failed to search FAISS vector index with field_name={vector_search.field_name}, " + f"limit={vector_search.limit}" + ) from e + + def _has_overlap(self, min_id: int, max_id: int, include_row_ids: RoaringBitmap64) -> bool: + """Check if the range [min_id, max_id] has any overlap with include_row_ids.""" + for row_id in include_row_ids: + if min_id <= row_id <= max_id: + return True + if row_id > max_id: + break + return False + + def _search(self, vector_search: 'VectorSearch') -> Optional[GlobalIndexResult]: + """Perform the actual search across all loaded indices.""" + query_vector = np.array(vector_search.vector, dtype=np.float32) + + # L2 normalize the query vector if enabled + if self._options.normalize: + query_vector = self._normalize_l2(query_vector) + + limit = vector_search.limit + include_row_ids = vector_search.include_row_ids + + # When filtering is enabled, fetch more results + search_k = limit + if include_row_ids is not None: + search_k = max( + limit * self._options.search_factor, + include_row_ids.cardinality() + ) + + # Collect results from all indices using a priority queue approach + results: Dict[int, float] = {} + + for index in self._indices: + if index is None: + continue + + # Configure search parameters based on index type + self._configure_search_params(index) + + # Limit search_k to index size + effective_k = min(search_k, max(1, index.size())) + if effective_k <= 0: + continue + + # Perform search + distances, labels = index.search(query_vector, effective_k) + + for i in range(effective_k): + row_id = int(labels[0, i]) + if row_id < 0: + # Invalid result + continue + + # Filter by include row IDs if specified + if include_row_ids is not None and row_id not in include_row_ids: + continue + + # Convert distance to score (higher is better) + score = self._convert_distance_to_score(float(distances[0, i])) + + # Keep top-k results + if len(results) < limit: + results[row_id] = score + else: + # Find minimum score in current results + min_row_id = min(results.keys(), key=lambda k: results[k]) + if score > results[min_row_id]: + del results[min_row_id] + results[row_id] = score + + if not results: + return None + + return DictBasedVectorSearchResult(results) + + def _configure_search_params(self, index: FaissIndex) -> None: + """Configure search parameters based on index type.""" + if index.index_type == FaissIndexType.HNSW: + index.set_hnsw_ef_search(self._options.ef_search) + elif index.index_type in (FaissIndexType.IVF, FaissIndexType.IVF_PQ, FaissIndexType.IVF_SQ8): + # For small indices, use higher nprobe + effective_nprobe = max( + self._options.nprobe, + max(1, index.size() // 10) + ) + index.set_ivf_nprobe(effective_nprobe) + + def _convert_distance_to_score(self, distance: float) -> float: + """Convert distance to similarity score.""" + if self._options.metric == FaissVectorMetric.L2: + # For L2 distance, smaller is better, so invert it + return 1.0 / (1.0 + distance) + else: + # Inner product is already a similarity + return distance + + @staticmethod + def _normalize_l2(vector: np.ndarray) -> np.ndarray: + """L2 normalize the vector.""" + norm = np.linalg.norm(vector) + if norm > 0: + return vector / norm + return vector + + def _ensure_load_metas(self) -> None: + """Load only metadata from all index files.""" + if self._metas_loaded: + return + + for io_meta in self._io_metas: + if io_meta.metadata: + meta = FaissIndexMeta.deserialize(io_meta.metadata) + self._index_metas.append(meta) + + self._metas_loaded = True + + def _ensure_load_all_indices(self) -> None: + """Load all indices.""" + if self._indices_loaded: + return + + for i in range(len(self._io_metas)): + self._load_index_at(i) + + self._indices_loaded = True + + def _ensure_load_indices(self, positions: List[int]) -> None: + """Load only the specified indices by their positions.""" + # Ensure indices list is large enough + while len(self._indices) < len(self._io_metas): + self._indices.append(None) + + for pos in positions: + if self._indices[pos] is None: + self._load_index_at(pos) + + def _load_index_at(self, position: int) -> None: + """Load a single index at the specified position.""" + io_meta = self._io_metas[position] + + # Read index file from storage + index_file_path = f"{self._index_path}/{io_meta.file_name}" + + # Create a temp file for the FAISS index + temp_file = tempfile.NamedTemporaryFile( + prefix=f"paimon-faiss-{uuid.uuid4()}-", + suffix=".faiss", + delete=False + ) Review Comment: The temporary file is created with delete=False but cleanup relies on manual tracking in _local_index_files and the close() method. If an exception occurs before the file is added to _local_index_files (line 250), it will leak. Consider using a context manager or adding the file to the tracking list immediately after creation. ########## paimon-python/pypaimon/globalindex/faiss/faiss_index.py: ########## @@ -0,0 +1,235 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Tuple + +import faiss +import numpy as np + +from pypaimon.globalindex.faiss.faiss_options import FaissVectorMetric, FaissIndexType + + +def _to_faiss_metric(metric: FaissVectorMetric) -> int: + """Convert FaissVectorMetric enum to FAISS integer metric type.""" + if metric == FaissVectorMetric.L2: + return faiss.METRIC_L2 + else: + return faiss.METRIC_INNER_PRODUCT + + +class FaissIndex: + + def __init__( + self, + index, + dimension: int, + metric: FaissVectorMetric, + index_type: FaissIndexType + ): + self._index = index + self._dimension = dimension + self._metric = metric + self._index_type = index_type + self._closed = False + + @classmethod + def create_flat_index(cls, dimension: int, metric: FaissVectorMetric) -> 'FaissIndex': + index = faiss.IndexFlatL2(dimension) if metric == FaissVectorMetric.L2 else faiss.IndexFlatIP(dimension) + index = faiss.IndexIDMap(index) + + return cls(index, dimension, metric, FaissIndexType.FLAT) + + @classmethod + def create_hnsw_index( + cls, + dimension: int, + m: int, + ef_construction: int, + metric: FaissVectorMetric + ) -> 'FaissIndex': + faiss_metric = _to_faiss_metric(metric) + index = faiss.IndexHNSWFlat(dimension, m, faiss_metric) + index.hnsw.efConstruction = ef_construction + index = faiss.IndexIDMap2(index) + + return cls(index, dimension, metric, FaissIndexType.HNSW) + + @classmethod + def create_ivf_index( + cls, + dimension: int, + nlist: int, + metric: FaissVectorMetric + ) -> 'FaissIndex': + faiss_metric = _to_faiss_metric(metric) + quantizer = faiss.IndexFlatL2(dimension) if metric == FaissVectorMetric.L2 else faiss.IndexFlatIP(dimension) + index = faiss.IndexIVFFlat(quantizer, dimension, nlist, faiss_metric) + index = faiss.IndexIDMap(index) + + return cls(index, dimension, metric, FaissIndexType.IVF) + + @classmethod + def create_ivf_pq_index( + cls, + dimension: int, + nlist: int, + m: int, + nbits: int, + metric: FaissVectorMetric + ) -> 'FaissIndex': + quantizer = faiss.IndexFlatL2(dimension) if metric == FaissVectorMetric.L2 else faiss.IndexFlatIP(dimension) + index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, nbits) + index = faiss.IndexIDMap(index) + + return cls(index, dimension, metric, FaissIndexType.IVF_PQ) + + @classmethod + def create_ivf_sq8_index( + cls, + dimension: int, + nlist: int, + metric: FaissVectorMetric + ) -> 'FaissIndex': + faiss_metric = _to_faiss_metric(metric) + quantizer = faiss.IndexFlatL2(dimension) if metric == FaissVectorMetric.L2 else faiss.IndexFlatIP(dimension) + index = faiss.IndexIVFScalarQuantizer( + quantizer, dimension, nlist, + faiss.ScalarQuantizer.QT_8bit, faiss_metric + ) + index = faiss.IndexIDMap(index) + + return cls(index, dimension, metric, FaissIndexType.IVF_SQ8) + + @classmethod + def from_file(cls, file_path: str) -> 'FaissIndex': + index = faiss.read_index(file_path, faiss.IO_FLAG_MMAP) + index_type, metric = cls._detect_index_type(index) + + return cls(index, index.d, metric, index_type) Review Comment: The `from_file` method uses `faiss.IO_FLAG_MMAP` for memory-mapped file reading, which has implications for memory usage and file locking. Add a docstring explaining this behavior and when memory-mapping is appropriate versus loading the index into memory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
