This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 826c674c4a [python] Fix RecursionError in vector search with many
splits (#7823)
826c674c4a is described below
commit 826c674c4ad69ac41782c6a7a2f198e2830dd1c7
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed May 13 17:53:19 2026 +0800
[python] Fix RecursionError in vector search with many splits (#7823)
---
.../pypaimon/globalindex/vector_search_result.py | 15 +--
.../pypaimon/table/source/full_text_read.py | 13 ++-
.../pypaimon/table/source/vector_search_read.py | 11 +-
.../pypaimon/tests/vector_search_filter_test.py | 122 +++++++++++++++++++++
4 files changed, 145 insertions(+), 16 deletions(-)
diff --git a/paimon-python/pypaimon/globalindex/vector_search_result.py
b/paimon-python/pypaimon/globalindex/vector_search_result.py
index a93247e690..5e6d2fb464 100644
--- a/paimon-python/pypaimon/globalindex/vector_search_result.py
+++ b/paimon-python/pypaimon/globalindex/vector_search_result.py
@@ -70,13 +70,14 @@ class ScoredGlobalIndexResult(GlobalIndexResult):
other_score_getter = other.score_getter()
result_or = RoaringBitmap64.or_(this_row_ids, other_row_ids)
-
- def combined_score_getter(row_id: int) -> Optional[float]:
- if row_id in this_row_ids:
- return this_score_getter(row_id)
- return other_score_getter(row_id)
-
- return SimpleScoredGlobalIndexResult(result_or, combined_score_getter)
+
+ merged_scores = {}
+ for row_id in other_row_ids:
+ merged_scores[row_id] = other_score_getter(row_id)
+ for row_id in this_row_ids:
+ merged_scores[row_id] = this_score_getter(row_id)
+
+ return SimpleScoredGlobalIndexResult(result_or, lambda row_id:
merged_scores.get(row_id))
def top_k(self, k: int) -> 'ScoredGlobalIndexResult':
"""Return the top-k results by score."""
diff --git a/paimon-python/pypaimon/table/source/full_text_read.py
b/paimon-python/pypaimon/table/source/full_text_read.py
index 9be1c2e4ca..f8fb1373fb 100644
--- a/paimon-python/pypaimon/table/source/full_text_read.py
+++ b/paimon-python/pypaimon/table/source/full_text_read.py
@@ -25,7 +25,7 @@ from pypaimon.globalindex.full_text_search import
FullTextSearch
from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.globalindex.offset_global_index_reader import
OffsetGlobalIndexReader
-from pypaimon.globalindex.vector_search_result import ScoredGlobalIndexResult
+from pypaimon.globalindex.vector_search_result import
DictBasedScoredIndexResult
from pypaimon.table.source.full_text_search_split import FullTextSearchSplit
from pypaimon.table.source.full_text_scan import FullTextScanPlan
@@ -60,19 +60,22 @@ class FullTextReadImpl(FullTextRead):
if not splits:
return GlobalIndexResult.create_empty()
- result = ScoredGlobalIndexResult.create_empty()
+ merged_scores = {}
for split in splits:
split_result = self._eval(
split.row_range_start, split.row_range_end,
split.full_text_index_files
)
if split_result is not None:
- result = result.or_(split_result)
+ score_getter = split_result.score_getter()
+ for row_id in split_result.results():
+ if row_id not in merged_scores:
+ merged_scores[row_id] = score_getter(row_id)
- return result.top_k(self._limit)
+ return DictBasedScoredIndexResult(merged_scores).top_k(self._limit)
def _eval(self, row_range_start, row_range_end, full_text_index_files
- ) -> Optional[ScoredGlobalIndexResult]:
+ ) -> Optional[GlobalIndexResult]:
index_io_meta_list = []
for index_file in full_text_index_files:
meta = index_file.global_index_meta
diff --git a/paimon-python/pypaimon/table/source/vector_search_read.py
b/paimon-python/pypaimon/table/source/vector_search_read.py
index 24c399a192..3c4b3f8e1d 100644
--- a/paimon-python/pypaimon/table/source/vector_search_read.py
+++ b/paimon-python/pypaimon/table/source/vector_search_read.py
@@ -24,7 +24,7 @@ from pypaimon.globalindex.global_index_meta import
GlobalIndexIOMeta
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.globalindex.offset_global_index_reader import
OffsetGlobalIndexReader
from pypaimon.globalindex.vector_search import VectorSearch
-from pypaimon.globalindex.vector_search_result import ScoredGlobalIndexResult
+from pypaimon.globalindex.vector_search_result import
DictBasedScoredIndexResult
class VectorSearchRead(ABC):
@@ -57,16 +57,19 @@ class VectorSearchReadImpl(VectorSearchRead):
pre_filter = self._pre_filter(splits)
- result = ScoredGlobalIndexResult.create_empty()
+ merged_scores = {}
for split in splits:
split_result = self._eval(
split.row_range_start, split.row_range_end,
split.vector_index_files, pre_filter
)
if split_result is not None:
- result = result.or_(split_result)
+ score_getter = split_result.score_getter()
+ for row_id in split_result.results():
+ if row_id not in merged_scores:
+ merged_scores[row_id] = score_getter(row_id)
- return result.top_k(self._limit)
+ return DictBasedScoredIndexResult(merged_scores).top_k(self._limit)
def _pre_filter(self, splits):
# type: (list) -> Optional[RoaringBitmap64]
diff --git a/paimon-python/pypaimon/tests/vector_search_filter_test.py
b/paimon-python/pypaimon/tests/vector_search_filter_test.py
index 857c62e286..6a136f7a0d 100644
--- a/paimon-python/pypaimon/tests/vector_search_filter_test.py
+++ b/paimon-python/pypaimon/tests/vector_search_filter_test.py
@@ -541,5 +541,127 @@ class
VectorSearchPartitionedFilterTest(unittest.TestCase):
self.assertIn("non-partition", str(ctx.exception))
+class VectorSearchManySplitsTest(unittest.TestCase):
+
+ def test_vector_search_with_many_splits(self):
+ from pypaimon.globalindex.vector_search_result import (
+ DictBasedScoredIndexResult,
+ )
+ from pypaimon.table.source.vector_search_read import
VectorSearchReadImpl
+ from pypaimon.table.source.vector_search_split import VectorSearchSplit
+
+ num_splits = 1200
+ embedding_field = _field(1, "embedding", "FLOAT")
+ entries = [
+ _entry(None, field_id=1, index_type="lumina-vector-ann",
+ file_name="vec-%d.index" % i,
+ row_range_start=i, row_range_end=i)
+ for i in range(num_splits)
+ ]
+ table = _StubTable(fields=[embedding_field], entries=entries)
+ _patch_snapshot(self, entries)
+
+ def _fake_create(index_type, file_io, index_path,
+ index_io_meta_list, options=None):
+ row_id = index_io_meta_list[0].file_name
+ row_id = int(row_id.split("-")[1].split(".")[0])
+
+ class _FakeReader:
+ def visit_vector_search(self_inner, vs):
+ return DictBasedScoredIndexResult({row_id: float(row_id)})
+
+ def close(self_inner):
+ pass
+
+ def __enter__(self_inner):
+ return self_inner
+
+ def __exit__(self_inner, *a):
+ return False
+ return _FakeReader()
+
+ splits = [
+ VectorSearchSplit(
+ row_range_start=i, row_range_end=i,
+ vector_index_files=[entries[i].index_file])
+ for i in range(num_splits)
+ ]
+
+ with mock.patch(
+
"pypaimon.table.source.vector_search_read._create_vector_reader",
+ side_effect=_fake_create):
+ reader = VectorSearchReadImpl(
+ table, limit=10, vector_column=embedding_field,
+ query_vector=[1.0], filter_=None)
+ result = reader.read(splits)
+
+ self.assertLessEqual(result.results().cardinality(), 10)
+ self.assertEqual(result.results().cardinality(), 10)
+ scores = sorted(result.score_getter()(rid) for rid in result.results())
+ self.assertEqual(scores, [float(i) for i in range(1190, 1200)])
+
+ def tearDown(self):
+ mock.patch.stopall()
+
+
+class FullTextSearchManySplitsTest(unittest.TestCase):
+
+ def test_full_text_search_with_many_splits(self):
+ from pypaimon.globalindex.vector_search_result import (
+ DictBasedScoredIndexResult,
+ )
+ from pypaimon.table.source.full_text_read import FullTextReadImpl
+ from pypaimon.table.source.full_text_search_split import (
+ FullTextSearchSplit,
+ )
+
+ num_splits = 1200
+ text_field = _field(1, "content", "STRING")
+ entries = [
+ _entry(None, field_id=1, index_type="tantivy-fulltext",
+ file_name="ft-%d.index" % i,
+ row_range_start=i, row_range_end=i)
+ for i in range(num_splits)
+ ]
+ table = _StubTable(fields=[text_field], entries=entries)
+ _patch_snapshot(self, entries)
+
+ def _fake_create(index_type, file_io, index_path,
+ index_io_meta_list):
+ row_id = index_io_meta_list[0].file_name
+ row_id = int(row_id.split("-")[1].split(".")[0])
+
+ class _FakeReader:
+ def visit_full_text_search(self_inner, fts):
+ return DictBasedScoredIndexResult({row_id: float(row_id)})
+
+ def close(self_inner):
+ pass
+ return _FakeReader()
+
+ splits = [
+ FullTextSearchSplit(
+ row_range_start=i, row_range_end=i,
+ full_text_index_files=[entries[i].index_file])
+ for i in range(num_splits)
+ ]
+
+ with mock.patch(
+
"pypaimon.table.source.full_text_read._create_full_text_reader",
+ side_effect=_fake_create):
+ reader = FullTextReadImpl(
+ table, limit=10, text_column=text_field,
+ query_text="test")
+ result = reader.read(splits)
+
+ self.assertLessEqual(result.results().cardinality(), 10)
+ self.assertEqual(result.results().cardinality(), 10)
+ scores = sorted(result.score_getter()(rid) for rid in result.results())
+ self.assertEqual(scores, [float(i) for i in range(1190, 1200)])
+
+ def tearDown(self):
+ mock.patch.stopall()
+
+
if __name__ == "__main__":
unittest.main()