This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 8ef5214160fd9d5027d808711b8abbf7d86b199c Author: JingsongLi <[email protected]> AuthorDate: Tue Oct 21 21:47:50 2025 +0800 [Python] filter_manifest_entry should not evolution primary keys --- .../pypaimon/read/scanner/full_starting_scanner.py | 35 +++++++++++----------- paimon-python/pypaimon/table/row/projected_row.py | 1 - paimon-python/pypaimon/write/batch_table_write.py | 6 ++-- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index b94364db99..dc1b178135 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -235,29 +235,28 @@ class FullStartingScanner(StartingScanner): # Apply evolution to stats if self.table.is_primary_key_table: - predicate = self.primary_key_predicate - stats = entry.file.key_stats - stats_fields = None + if not self.primary_key_predicate: + return True + return self.primary_key_predicate.test_by_simple_stats( + entry.file.key_stats, + entry.file.row_count + ) else: - predicate = self.predicate - stats = entry.file.value_stats + if not self.predicate: + return True if entry.file.value_stats_cols is None and entry.file.write_cols is not None: stats_fields = entry.file.write_cols else: stats_fields = entry.file.value_stats_cols - if not predicate: - return True - evolved_stats = evolution.evolution( - stats, - entry.file.row_count, - stats_fields - ) - - # Test predicate against evolved stats - return predicate.test_by_simple_stats( - evolved_stats, - entry.file.row_count - ) + evolved_stats = evolution.evolution( + entry.file.value_stats, + entry.file.row_count, + stats_fields + ) + return self.predicate.test_by_simple_stats( + evolved_stats, + entry.file.row_count + ) def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: partitioned_files = defaultdict(list) diff --git a/paimon-python/pypaimon/table/row/projected_row.py b/paimon-python/pypaimon/table/row/projected_row.py index d7a1cc6f40..dff63c372c 100644 --- a/paimon-python/pypaimon/table/row/projected_row.py +++ b/paimon-python/pypaimon/table/row/projected_row.py @@ -46,7 +46,6 @@ class ProjectedRow(InternalRow): def get_field(self, pos: int) -> Any: """Returns the value at the given position.""" if self.index_mapping[pos] < 0: - # TODO move this logical to hive return None return self.row.get_field(self.index_mapping[pos]) diff --git a/paimon-python/pypaimon/write/batch_table_write.py b/paimon-python/pypaimon/write/batch_table_write.py index a71e9c0503..f8d0660bfa 100644 --- a/paimon-python/pypaimon/write/batch_table_write.py +++ b/paimon-python/pypaimon/write/batch_table_write.py @@ -36,14 +36,12 @@ class BatchTableWrite: self.row_key_extractor = self.table.create_row_key_extractor() self.batch_committed = False - def write_arrow(self, table: pa.Table, row_kind: List[int] = None): - # TODO: support row_kind + def write_arrow(self, table: pa.Table): batches_iterator = table.to_batches() for batch in batches_iterator: self.write_arrow_batch(batch) - def write_arrow_batch(self, data: pa.RecordBatch, row_kind: List[int] = None): - # TODO: support row_kind + def write_arrow_batch(self, data: pa.RecordBatch): self._validate_pyarrow_schema(data.schema) partitions, buckets = self.row_key_extractor.extract_partition_bucket_batch(data)
