This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 8ef5214160fd9d5027d808711b8abbf7d86b199c
Author: JingsongLi <[email protected]>
AuthorDate: Tue Oct 21 21:47:50 2025 +0800

    [Python] filter_manifest_entry should not evolution primary keys
---
 .../pypaimon/read/scanner/full_starting_scanner.py | 35 +++++++++++-----------
 paimon-python/pypaimon/table/row/projected_row.py  |  1 -
 paimon-python/pypaimon/write/batch_table_write.py  |  6 ++--
 3 files changed, 19 insertions(+), 23 deletions(-)

diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index b94364db99..dc1b178135 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -235,29 +235,28 @@ class FullStartingScanner(StartingScanner):
 
         # Apply evolution to stats
         if self.table.is_primary_key_table:
-            predicate = self.primary_key_predicate
-            stats = entry.file.key_stats
-            stats_fields = None
+            if not self.primary_key_predicate:
+                return True
+            return self.primary_key_predicate.test_by_simple_stats(
+                entry.file.key_stats,
+                entry.file.row_count
+            )
         else:
-            predicate = self.predicate
-            stats = entry.file.value_stats
+            if not self.predicate:
+                return True
             if entry.file.value_stats_cols is None and entry.file.write_cols 
is not None:
                 stats_fields = entry.file.write_cols
             else:
                 stats_fields = entry.file.value_stats_cols
-        if not predicate:
-            return True
-        evolved_stats = evolution.evolution(
-            stats,
-            entry.file.row_count,
-            stats_fields
-        )
-
-        # Test predicate against evolved stats
-        return predicate.test_by_simple_stats(
-            evolved_stats,
-            entry.file.row_count
-        )
+            evolved_stats = evolution.evolution(
+                entry.file.value_stats,
+                entry.file.row_count,
+                stats_fields
+            )
+            return self.predicate.test_by_simple_stats(
+                evolved_stats,
+                entry.file.row_count
+            )
 
     def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
         partitioned_files = defaultdict(list)
diff --git a/paimon-python/pypaimon/table/row/projected_row.py 
b/paimon-python/pypaimon/table/row/projected_row.py
index d7a1cc6f40..dff63c372c 100644
--- a/paimon-python/pypaimon/table/row/projected_row.py
+++ b/paimon-python/pypaimon/table/row/projected_row.py
@@ -46,7 +46,6 @@ class ProjectedRow(InternalRow):
     def get_field(self, pos: int) -> Any:
         """Returns the value at the given position."""
         if self.index_mapping[pos] < 0:
-            # TODO move this logical to hive
             return None
         return self.row.get_field(self.index_mapping[pos])
 
diff --git a/paimon-python/pypaimon/write/batch_table_write.py 
b/paimon-python/pypaimon/write/batch_table_write.py
index a71e9c0503..f8d0660bfa 100644
--- a/paimon-python/pypaimon/write/batch_table_write.py
+++ b/paimon-python/pypaimon/write/batch_table_write.py
@@ -36,14 +36,12 @@ class BatchTableWrite:
         self.row_key_extractor = self.table.create_row_key_extractor()
         self.batch_committed = False
 
-    def write_arrow(self, table: pa.Table, row_kind: List[int] = None):
-        # TODO: support row_kind
+    def write_arrow(self, table: pa.Table):
         batches_iterator = table.to_batches()
         for batch in batches_iterator:
             self.write_arrow_batch(batch)
 
-    def write_arrow_batch(self, data: pa.RecordBatch, row_kind: List[int] = 
None):
-        # TODO: support row_kind
+    def write_arrow_batch(self, data: pa.RecordBatch):
         self._validate_pyarrow_schema(data.schema)
         partitions, buckets = 
self.row_key_extractor.extract_partition_bucket_batch(data)
 

Reply via email to