This is an automated email from the ASF dual-hosted git repository.

lsomeyeah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7275b0a60a [python] Introduce file pruning for dv pk table (#7557)
7275b0a60a is described below

commit 7275b0a60ae1ebd4397ed1106fc85981a2ec18d3
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 30 21:42:26 2026 +0800

    [python] Introduce file pruning for dv pk table (#7557)
    
    ### Purpose
    
    In the primary key table scan, when DV is enabled, the logic of
    filtering files by value stats has been added.
    
    ### Tests
    
    in java_py_read_write_test.py,
    test_pk_dv_read_multi_batch_with_value_filter.
---
 .../pypaimon/read/scanner/file_scanner.py          | 30 ++++++++++++----
 .../pypaimon/tests/e2e/java_py_read_write_test.py  | 42 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 6 deletions(-)

diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index a5e8576802..aea709669e 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -381,12 +381,30 @@ class FileScanner:
         if self.table.is_primary_key_table:
             if self.deletion_vectors_enabled and entry.file.level == 0:  # do 
not read level 0 file
                 return False
-            if not self.primary_key_predicate:
-                return True
-            return self.primary_key_predicate.test_by_simple_stats(
-                entry.file.key_stats,
-                entry.file.row_count
-            )
+            if self.primary_key_predicate:
+                if not self.primary_key_predicate.test_by_simple_stats(
+                    entry.file.key_stats,
+                    entry.file.row_count
+                ):
+                    return False
+            # In DV mode, files within a bucket don't overlap (level 0 
excluded above),
+            # so we can safely filter by value stats per file.
+            if self.deletion_vectors_enabled and self.predicate_for_stats:
+                if entry.file.value_stats_cols is None and 
entry.file.write_cols is not None:
+                    stats_fields = entry.file.write_cols
+                else:
+                    stats_fields = entry.file.value_stats_cols
+                evolved_stats = evolution.evolution(
+                    entry.file.value_stats,
+                    entry.file.row_count,
+                    stats_fields
+                )
+                if not self.predicate_for_stats.test_by_simple_stats(
+                    evolved_stats,
+                    entry.file.row_count
+                ):
+                    return False
+            return True
         else:
             if not self.predicate:
                 return True
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 0732925bae..4d888c3cb4 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -322,6 +322,48 @@ class JavaPyReadWriteTest(unittest.TestCase):
         }, schema=pa_schema)
         self.assertEqual(expected, actual)
 
+    def test_pk_dv_read_multi_batch_with_value_filter(self):
+        """Test that DV-enabled PK table filters files by value stats during 
scan."""
+        pa_schema = pa.schema([
+            pa.field('pt', pa.int32(), nullable=False),
+            pa.field('a', pa.int32(), nullable=False),
+            ('b', pa.int64())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema,
+                                            partition_keys=['pt'],
+                                            primary_keys=['pt', 'a'],
+                                            options={'bucket': '1'})
+        self.catalog.create_table('default.test_pk_dv_multi_batch', schema, 
True)
+        table = self.catalog.get_table('default.test_pk_dv_multi_batch')
+
+        # Unfiltered scan: count total files
+        rb_all = table.new_read_builder()
+        all_splits = rb_all.new_scan().plan().splits()
+        all_files_count = sum(len(s.files) for s in all_splits)
+
+        # Filtered scan: b < 500 should only match a few rows (b in 
{100,200,300,400})
+        # and prune files whose value stats don't overlap
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        pred = predicate_builder.less_than('b', 500)
+        rb_filtered = table.new_read_builder().with_filter(pred)
+        filtered_splits = rb_filtered.new_scan().plan().splits()
+        filtered_files_count = sum(len(s.files) for s in filtered_splits)
+        filtered_result = table_sort_by(
+            rb_filtered.new_read().to_arrow(filtered_splits), 'a')
+
+        # Verify correctness: rows with b < 500 are a=10,b=100 / a=20,b=200 / 
a=30,b=300 / a=40,b=400
+        expected = pa.Table.from_pydict({
+            'pt': [1, 1, 1, 1],
+            'a': [10, 20, 30, 40],
+            'b': [100, 200, 300, 400]
+        }, schema=pa_schema)
+        self.assertEqual(expected, filtered_result)
+
+        # Verify file pruning: filtered scan should read fewer files
+        self.assertLess(
+            filtered_files_count, all_files_count,
+            f"DV value filter should prune files: 
filtered={filtered_files_count}, all={all_files_count}")
+
     def test_pk_dv_read_multi_batch_raw_convertable(self):
         pa_schema = pa.schema([
             pa.field('pt', pa.int32(), nullable=False),

Reply via email to