JingsongLi commented on code in PR #7579:
URL: https://github.com/apache/paimon/pull/7579#discussion_r3045284425


##########
paimon-python/pypaimon/table/source/vector_search_read.py:
##########
@@ -0,0 +1,117 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector search read to read index files."""
+
+from abc import ABC, abstractmethod
+
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.offset_global_index_reader import 
OffsetGlobalIndexReader
+from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.globalindex.vector_search_result import ScoredGlobalIndexResult
+
+
+class VectorSearchRead(ABC):
+    """Vector search read to read index files."""
+
+    def read_plan(self, plan):
+        # type: (VectorSearchScanPlan) -> GlobalIndexResult
+        return self.read(plan.splits())
+
+    @abstractmethod
+    def read(self, splits):
+        # type: (List[VectorSearchSplit]) -> GlobalIndexResult
+        pass
+
+
+class VectorSearchReadImpl(VectorSearchRead):
+    """Implementation for VectorSearchRead."""
+
+    def __init__(self, table, limit, vector_column, query_vector):
+        self._table = table
+        self._limit = limit
+        self._vector_column = vector_column
+        self._query_vector = query_vector
+
+    def read(self, splits):
+        # type: (List[VectorSearchSplit]) -> GlobalIndexResult
+        if not splits:
+            return GlobalIndexResult.create_empty()
+
+        result = ScoredGlobalIndexResult.create_empty()
+        for split in splits:
+            split_result = self._eval(
+                split.row_range_start, split.row_range_end,
+                split.vector_index_files
+            )
+            if split_result is not None:
+                result = result.or_(split_result)
+
+        return result.top_k(self._limit)
+
+    def _eval(self, row_range_start, row_range_end, vector_index_files):
+        # type: (int, int, list) -> Optional[ScoredGlobalIndexResult]
+        if not vector_index_files:
+            return None
+        index_io_meta_list = []
+        for index_file in vector_index_files:
+            meta = index_file.global_index_meta
+            assert meta is not None
+            index_io_meta_list.append(
+                GlobalIndexIOMeta(
+                    file_name=index_file.file_name,
+                    file_size=index_file.file_size,
+                    metadata=meta.index_meta
+                )
+            )
+
+        index_type = vector_index_files[0].index_type
+        index_path = 
self._table.path_factory().global_index_path_factory().index_path()
+        file_io = self._table.file_io
+        options = self._table.table_schema.options
+
+        reader = _create_vector_reader(
+            index_type, file_io, index_path,
+            index_io_meta_list, options
+        )
+
+        vector_search = VectorSearch(
+            vector=self._query_vector,
+            limit=self._limit,
+            field_name=self._vector_column.name
+        )
+
+        try:

Review Comment:
   Use Python with.



##########
paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py:
##########
@@ -0,0 +1,201 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector global index reader using Lumina (via lumina-data SDK).
+
+Each shard has exactly one Lumina index file. This reader lazy-loads the
+index and performs vector similarity search using the lumina-data SDK.
+"""
+
+import os
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.vector_search_result import 
DictBasedScoredIndexResult
+
+LUMINA_VECTOR_ANN_IDENTIFIER = "lumina-vector-ann"
+
+MIN_SEARCH_LIST_SIZE = 16
+
+
+def _ensure_search_list_size(search_options, top_k):
+    """Set diskann.search.list_size when not explicitly configured."""
+    if "diskann.search.list_size" not in search_options:
+        list_size = max(int(top_k * 1.5), MIN_SEARCH_LIST_SIZE)
+        search_options["diskann.search.list_size"] = str(list_size)
+
+
+class LuminaVectorGlobalIndexReader(GlobalIndexReader):
+    """Vector global index reader using Lumina."""
+
+    def __init__(self, file_io, index_path, io_metas, options=None):
+        assert len(io_metas) == 1, "Expected exactly one index file per shard"
+        self._file_io = file_io
+        self._index_path = index_path
+        self._io_meta = io_metas[0]
+        self._options = options or {}
+        self._searcher = None
+        self._index_meta = None
+        self._search_options = None
+        self._stream = None
+
+    def visit_vector_search(self, vector_search):
+        self._ensure_loaded()
+
+        from lumina_data import MetricType
+
+        query = vector_search.vector
+        # Flatten to a plain list of floats for search_list API
+        if hasattr(query, 'tolist'):
+            query_flat = list(query.flatten()) if hasattr(query, 'flatten') 
else list(query)
+        else:
+            query_flat = list(query)
+        query_flat = [float(v) for v in query_flat]
+
+        expected_dim = self._index_meta.dim
+        if len(query_flat) != expected_dim:
+            raise ValueError(
+                "Query vector dimension mismatch: expected %d, got %d"
+                % (expected_dim, len(query_flat)))
+
+        limit = vector_search.limit
+        index_metric = self._index_meta.metric
+
+        count = self._searcher.get_count()
+        effective_k = min(limit, count)
+        if effective_k <= 0:
+            return None
+
+        include_row_ids = vector_search.include_row_ids
+
+        if include_row_ids is not None:
+            filter_id_list = list(include_row_ids)
+            if len(filter_id_list) == 0:
+                return None
+            effective_k = min(effective_k, len(filter_id_list))
+            search_opts = dict(self._search_options)
+            search_opts["search.thread_safe_filter"] = "true"
+            _ensure_search_list_size(search_opts, effective_k)
+            distances, labels = self._searcher.search_with_filter_list(
+                query_flat, 1, effective_k, filter_id_list, search_opts)
+        else:
+            search_opts = dict(self._search_options)
+            _ensure_search_list_size(search_opts, effective_k)
+            distances, labels = self._searcher.search_list(
+                query_flat, 1, effective_k, search_opts)
+
+        # Collect results with score conversion (same as Java collectResults)
+        SENTINEL = 0xFFFFFFFFFFFFFFFF
+        id_to_scores = {}
+        for i in range(effective_k):
+            row_id = labels[i]
+            if row_id == SENTINEL:
+                continue
+            score = MetricType.convert_distance_to_score(
+                float(distances[i]), index_metric)
+            id_to_scores[int(row_id)] = score
+
+        return DictBasedScoredIndexResult(id_to_scores)
+
+    def _ensure_loaded(self):
+        if self._searcher is not None:
+            return
+
+        from lumina_data import LuminaSearcher
+        from pypaimon.globalindex.lumina.lumina_index_meta import 
LuminaIndexMeta
+        from pypaimon.globalindex.lumina.lumina_vector_index_options import (
+            strip_lumina_options,
+        )
+
+        self._index_meta = LuminaIndexMeta.deserialize(self._io_meta.metadata)
+        # Merge paimon table options (prefix-stripped) with index metadata 
options;
+        # index metadata takes precedence as it reflects the actual built 
index.
+        searcher_options = strip_lumina_options(self._options)
+        searcher_options.update(self._index_meta.options)
+        self._search_options = searcher_options
+
+        file_path = os.path.join(self._index_path, self._io_meta.file_name)
+        stream = self._file_io.new_input_stream(file_path)
+        try:
+            self._searcher = LuminaSearcher(searcher_options)
+            self._searcher.open_stream(stream, self._io_meta.file_size)
+            self._stream = stream
+        except Exception:
+            stream.close()
+            raise
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+        return False
+
+    def close(self):
+        if self._searcher is not None:
+            self._searcher.close()
+            self._searcher = None
+        if self._stream is not None:
+            self._stream.close()
+            self._stream = None
+
+    # =================== unsupported =====================
+
+    def visit_equal(self, field_ref, literal):

Review Comment:
   Remove these useless methods, `GlobalIndexReader` already return None.



##########
paimon-python/pypaimon/globalindex/lumina/lumina_vector_global_index_reader.py:
##########
@@ -0,0 +1,201 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Vector global index reader using Lumina (via lumina-data SDK).
+
+Each shard has exactly one Lumina index file. This reader lazy-loads the
+index and performs vector similarity search using the lumina-data SDK.
+"""
+
+import os
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.vector_search_result import 
DictBasedScoredIndexResult
+
+LUMINA_VECTOR_ANN_IDENTIFIER = "lumina-vector-ann"
+
+MIN_SEARCH_LIST_SIZE = 16
+
+
+def _ensure_search_list_size(search_options, top_k):
+    """Set diskann.search.list_size when not explicitly configured."""
+    if "diskann.search.list_size" not in search_options:
+        list_size = max(int(top_k * 1.5), MIN_SEARCH_LIST_SIZE)
+        search_options["diskann.search.list_size"] = str(list_size)
+
+
+class LuminaVectorGlobalIndexReader(GlobalIndexReader):
+    """Vector global index reader using Lumina."""
+
+    def __init__(self, file_io, index_path, io_metas, options=None):
+        assert len(io_metas) == 1, "Expected exactly one index file per shard"
+        self._file_io = file_io
+        self._index_path = index_path
+        self._io_meta = io_metas[0]
+        self._options = options or {}
+        self._searcher = None
+        self._index_meta = None
+        self._search_options = None
+        self._stream = None
+
+    def visit_vector_search(self, vector_search):
+        self._ensure_loaded()
+
+        from lumina_data import MetricType
+
+        query = vector_search.vector
+        # Flatten to a plain list of floats for search_list API
+        if hasattr(query, 'tolist'):

Review Comment:
   Just use `query.tolist()` is OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to