This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 61179c7f11 [Python] SimpleStats supports BinaryRow (#6444)
61179c7f11 is described below

commit 61179c7f1111326f1767e20670510c4caba5040c
Author: umi <[email protected]>
AuthorDate: Tue Oct 21 20:29:42 2025 +0800

    [Python] SimpleStats supports BinaryRow (#6444)
---
 paimon-python/pypaimon/common/predicate.py         |  49 ++-
 .../pypaimon/manifest/manifest_file_manager.py     |  12 +-
 .../pypaimon/manifest/schema/simple_stats.py       |   5 +-
 .../pypaimon/manifest/simple_stats_evolution.py    | 124 ++++++++
 .../pypaimon/manifest/simple_stats_evolutions.py   |  76 +++++
 .../pypaimon/read/scanner/full_starting_scanner.py |  43 ++-
 paimon-python/pypaimon/schema/schema_manager.py    |   4 +-
 paimon-python/pypaimon/table/row/binary_row.py     |  61 ++++
 paimon-python/pypaimon/table/row/generic_row.py    |  30 +-
 paimon-python/pypaimon/table/row/projected_row.py  |  84 ++++++
 paimon-python/pypaimon/tests/binary_row_test.py    | 334 +++++++++++++++++++++
 paimon-python/pypaimon/tests/predicates_test.py    |  19 +-
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |  26 +-
 paimon-python/pypaimon/tests/reader_base_test.py   |  28 +-
 14 files changed, 805 insertions(+), 90 deletions(-)

diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index 6a760e473f..5e47fdd5df 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -27,6 +27,7 @@ from pyarrow import compute as pyarrow_compute
 from pyarrow import dataset as pyarrow_dataset
 
 from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.table.row.internal_row import InternalRow
 
 
@@ -67,32 +68,31 @@ class Predicate:
         raise ValueError(f"Unsupported predicate method: {self.method}")
 
     def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
-        return self.test_by_stats({
-            "min_values": stat.min_values.to_dict(),
-            "max_values": stat.max_values.to_dict(),
-            "null_counts": {
-                stat.min_values.fields[i].name: stat.null_counts[i] for i in 
range(len(stat.min_values.fields))
-            },
-            "row_count": row_count,
-        })
-
-    def test_by_stats(self, stat: Dict) -> bool:
+        """Test predicate against BinaryRow stats with denseIndexMapping like 
Java implementation."""
         if self.method == 'and':
-            return all(p.test_by_stats(stat) for p in self.literals)
+            return all(p.test_by_simple_stats(stat, row_count) for p in 
self.literals)
         if self.method == 'or':
-            t = any(p.test_by_stats(stat) for p in self.literals)
-            return t
+            return any(p.test_by_simple_stats(stat, row_count) for p in 
self.literals)
 
-        null_count = stat["null_counts"][self.field]
-        row_count = stat["row_count"]
+        # Get null count using the mapped index
+        null_count = stat.null_counts[self.index] if stat.null_counts and 
self.index < len(
+            stat.null_counts) else 0
 
         if self.method == 'isNull':
             return null_count is not None and null_count > 0
         if self.method == 'isNotNull':
             return null_count is None or row_count is None or null_count < 
row_count
 
-        min_value = stat["min_values"][self.field]
-        max_value = stat["max_values"][self.field]
+        if not isinstance(stat.min_values, GenericRow):
+            # Parse field values using BinaryRow's direct field access by name
+            min_value = stat.min_values.get_field(self.index)
+            max_value = stat.max_values.get_field(self.index)
+        else:
+            # TODO transform partition to BinaryRow
+            min_values = stat.min_values.to_dict()
+            max_values = stat.max_values.to_dict()
+            min_value = min_values[self.field]
+            max_value = max_values[self.field]
 
         if min_value is None or max_value is None or (null_count is not None 
and null_count == row_count):
             # invalid stats, skip validation
@@ -164,7 +164,6 @@ class RegisterMeta(ABCMeta):
 
 
 class Tester(ABC, metaclass=RegisterMeta):
-
     name = None
 
     @abstractmethod
@@ -187,7 +186,6 @@ class Tester(ABC, metaclass=RegisterMeta):
 
 
 class Equal(Tester):
-
     name = 'equal'
 
     def test_by_value(self, val, literals) -> bool:
@@ -201,7 +199,6 @@ class Equal(Tester):
 
 
 class NotEqual(Tester):
-
     name = "notEqual"
 
     def test_by_value(self, val, literals) -> bool:
@@ -215,7 +212,6 @@ class NotEqual(Tester):
 
 
 class LessThan(Tester):
-
     name = "lessThan"
 
     def test_by_value(self, val, literals) -> bool:
@@ -229,7 +225,6 @@ class LessThan(Tester):
 
 
 class LessOrEqual(Tester):
-
     name = "lessOrEqual"
 
     def test_by_value(self, val, literals) -> bool:
@@ -243,7 +238,6 @@ class LessOrEqual(Tester):
 
 
 class GreaterThan(Tester):
-
     name = "greaterThan"
 
     def test_by_value(self, val, literals) -> bool:
@@ -257,7 +251,6 @@ class GreaterThan(Tester):
 
 
 class GreaterOrEqual(Tester):
-
     name = "greaterOrEqual"
 
     def test_by_value(self, val, literals) -> bool:
@@ -271,7 +264,6 @@ class GreaterOrEqual(Tester):
 
 
 class In(Tester):
-
     name = "in"
 
     def test_by_value(self, val, literals) -> bool:
@@ -285,7 +277,6 @@ class In(Tester):
 
 
 class NotIn(Tester):
-
     name = "notIn"
 
     def test_by_value(self, val, literals) -> bool:
@@ -299,7 +290,6 @@ class NotIn(Tester):
 
 
 class Between(Tester):
-
     name = "between"
 
     def test_by_value(self, val, literals) -> bool:
@@ -313,7 +303,6 @@ class Between(Tester):
 
 
 class StartsWith(Tester):
-
     name = "startsWith"
 
     def test_by_value(self, val, literals) -> bool:
@@ -329,7 +318,6 @@ class StartsWith(Tester):
 
 
 class EndsWith(Tester):
-
     name = "endsWith"
 
     def test_by_value(self, val, literals) -> bool:
@@ -343,7 +331,6 @@ class EndsWith(Tester):
 
 
 class Contains(Tester):
-
     name = "contains"
 
     def test_by_value(self, val, literals) -> bool:
@@ -357,7 +344,6 @@ class Contains(Tester):
 
 
 class IsNull(Tester):
-
     name = "isNull"
 
     def test_by_value(self, val, literals) -> bool:
@@ -371,7 +357,6 @@ class IsNull(Tester):
 
 
 class IsNotNull(Tester):
-
     name = "isNotNull"
 
     def test_by_value(self, val, literals) -> bool:
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index bb9251df7e..c196845ff4 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -26,6 +26,7 @@ from pypaimon.manifest.schema.manifest_entry import 
(MANIFEST_ENTRY_SCHEMA,
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.row.generic_row import (GenericRowDeserializer,
                                             GenericRowSerializer)
+from pypaimon.table.row.binary_row import BinaryRow
 
 
 class ManifestFileManager:
@@ -54,12 +55,11 @@ class ManifestFileManager:
             file_dict = dict(record['_FILE'])
             key_dict = dict(file_dict['_KEY_STATS'])
             key_stats = SimpleStats(
-                
min_values=GenericRowDeserializer.from_bytes(key_dict['_MIN_VALUES'],
-                                                             
self.trimmed_primary_key_fields),
-                
max_values=GenericRowDeserializer.from_bytes(key_dict['_MAX_VALUES'],
-                                                             
self.trimmed_primary_key_fields),
+                min_values=BinaryRow(key_dict['_MIN_VALUES'], 
self.trimmed_primary_key_fields),
+                max_values=BinaryRow(key_dict['_MAX_VALUES'], 
self.trimmed_primary_key_fields),
                 null_counts=key_dict['_NULL_COUNTS'],
             )
+
             value_dict = dict(file_dict['_VALUE_STATS'])
             if file_dict['_VALUE_STATS_COLS'] is None:
                 if file_dict['_WRITE_COLS'] is None:
@@ -72,8 +72,8 @@ class ManifestFileManager:
             else:
                 fields = [self.table.field_dict[col] for col in 
file_dict['_VALUE_STATS_COLS']]
             value_stats = SimpleStats(
-                
min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields),
-                
max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields),
+                min_values=BinaryRow(value_dict['_MIN_VALUES'], fields),
+                max_values=BinaryRow(value_dict['_MAX_VALUES'], fields),
                 null_counts=value_dict['_NULL_COUNTS'],
             )
             file_meta = DataFileMeta(
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py 
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index 19816fdd0f..1130a812fa 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -21,12 +21,13 @@ from typing import List, Optional
 from typing import ClassVar
 
 from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.internal_row import InternalRow
 
 
 @dataclass
 class SimpleStats:
-    min_values: GenericRow
-    max_values: GenericRow
+    min_values: InternalRow
+    max_values: InternalRow
     null_counts: Optional[List[int]]
 
     _empty_stats: ClassVar[object] = None
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolution.py 
b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
new file mode 100644
index 0000000000..56cea98a85
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
@@ -0,0 +1,124 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Optional, Dict, Any
+import threading
+
+from pypaimon.schema.data_types import DataField
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.projected_row import ProjectedRow
+
+
+class SimpleStatsEvolution:
+    """Converter for array of SimpleColStats."""
+
+    def __init__(self, data_fields: List[DataField], index_mapping: 
Optional[List[int]],
+                 cast_field_getters: Optional[List[Any]]):
+        self.field_names = [field.name for field in data_fields]
+        self.index_mapping = index_mapping
+        self.cast_field_getters = cast_field_getters
+        self.index_mappings: Dict[tuple, List[int]] = {}
+        self._lock = threading.Lock()
+
+        # Create empty values for optimization
+        self.empty_values = GenericRow([None] * len(self.field_names), 
data_fields)
+        self.empty_null_counts = [0] * len(self.field_names)
+
+    def evolution(self, stats: SimpleStats, row_count: Optional[int],
+                  stats_fields: Optional[List[str]]) -> 'SimpleStats':
+        min_values = stats.min_values
+        max_values = stats.max_values
+        null_counts = stats.null_counts
+
+        if stats_fields is not None and not stats_fields:
+            # Optimize for empty dense fields
+            min_values = self.empty_values
+            max_values = self.empty_values
+            null_counts = self.empty_null_counts
+        elif stats_fields is not None:
+            # Apply dense field mapping
+            dense_index_mapping = self._get_dense_index_mapping(stats_fields)
+            min_values = self._project_row(min_values, dense_index_mapping)
+            max_values = self._project_row(max_values, dense_index_mapping)
+            null_counts = self._project_array(null_counts, dense_index_mapping)
+
+        if self.index_mapping is not None:
+            # TODO support schema evolution
+            min_values = self._project_row(min_values, self.index_mapping)
+            max_values = self._project_row(max_values, self.index_mapping)
+
+            if row_count is None:
+                raise RuntimeError("Schema Evolution for stats needs row 
count.")
+
+            null_counts = self._evolve_null_counts(null_counts, 
self.index_mapping, row_count)
+
+        return SimpleStats(min_values, max_values, null_counts)
+
+    def _get_dense_index_mapping(self, dense_fields: List[str]) -> List[int]:
+        """
+        Get dense index mapping similar to Java:
+        fieldNames.stream().mapToInt(denseFields::indexOf).toArray()
+        """
+        dense_fields_tuple = tuple(dense_fields)
+
+        if dense_fields_tuple not in self.index_mappings:
+            with self._lock:
+                # Double-check locking
+                if dense_fields_tuple not in self.index_mappings:
+                    mapping = []
+                    for field_name in self.field_names:
+                        try:
+                            index = dense_fields.index(field_name)
+                            mapping.append(index)
+                        except ValueError:
+                            mapping.append(-1)  # Field not found
+                    self.index_mappings[dense_fields_tuple] = mapping
+
+        return self.index_mappings[dense_fields_tuple]
+
+    def _project_row(self, row: Any, index_mapping: List[int]) -> Any:
+        """Project row based on index mapping using ProjectedRow."""
+        projected_row = ProjectedRow.from_index_mapping(index_mapping)
+        return projected_row.replace_row(row)
+
+    def _project_array(self, array: List[Any], index_mapping: List[int]) -> 
List[Any]:
+        """Project array based on index mapping."""
+        if not array:
+            return [0] * len(index_mapping)
+
+        projected = []
+        for mapped_index in index_mapping:
+            if mapped_index >= 0 and mapped_index < len(array):
+                projected.append(array[mapped_index])
+            else:
+                projected.append(0)  # Default value for missing fields
+
+        return projected
+
+    def _evolve_null_counts(self, null_counts: List[Any], index_mapping: 
List[int],
+                            not_found_value: int) -> List[Any]:
+        """Evolve null counts with schema evolution mapping."""
+        evolved = []
+        for mapped_index in index_mapping:
+            if mapped_index >= 0 and mapped_index < len(null_counts):
+                evolved.append(null_counts[mapped_index])
+            else:
+                evolved.append(not_found_value)  # Use row count for missing 
fields
+
+        return evolved
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py 
b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
new file mode 100644
index 0000000000..8331b7a5e5
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Callable, Dict, List, Optional
+
+from pypaimon.manifest.simple_stats_evolution import SimpleStatsEvolution
+from pypaimon.schema.data_types import DataField
+
+
+class SimpleStatsEvolutions:
+    """Converters to create col stats array serializer."""
+
+    def __init__(self, schema_fields: Callable[[int], List[DataField]], 
table_schema_id: int):
+        self.schema_fields = schema_fields
+        self.table_schema_id = table_schema_id
+        self.table_data_fields = schema_fields(table_schema_id)
+        self.table_fields = None
+        self.evolutions: Dict[int, SimpleStatsEvolution] = {}
+
+    def get_or_create(self, data_schema_id: int) -> SimpleStatsEvolution:
+        """Get or create SimpleStatsEvolution for given schema id."""
+        if data_schema_id in self.evolutions:
+            return self.evolutions[data_schema_id]
+
+        if self.table_schema_id == data_schema_id:
+            evolution = 
SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None)
+        else:
+            # TODO support schema evolution
+            if self.table_fields is None:
+                self.table_fields = self.table_data_fields
+
+            data_fields = self.schema_fields(data_schema_id)
+            index_cast_mapping = 
self._create_index_cast_mapping(self.table_fields, data_fields)
+            index_mapping = index_cast_mapping.get('index_mapping')
+            cast_mapping = index_cast_mapping.get('cast_mapping')
+
+            evolution = SimpleStatsEvolution(data_fields, index_mapping, 
cast_mapping)
+
+        self.evolutions[data_schema_id] = evolution
+        return evolution
+
+    def _create_index_cast_mapping(self, table_fields: List[DataField],
+                                   data_fields: List[DataField]) -> Dict[str, 
Optional[List[int]]]:
+        """
+        Create index and cast mapping between table fields and data fields.
+        This is a simplified implementation.
+        """
+        # Create a mapping from field names to indices in data_fields
+        data_field_map = {field.name: i for i, field in enumerate(data_fields)}
+
+        index_mapping = []
+        for table_field in table_fields:
+            if table_field.name in data_field_map:
+                index_mapping.append(data_field_map[table_field.name])
+            else:
+                index_mapping.append(-1)  # Field not found in data schema
+
+        return {
+            'index_mapping': index_mapping,
+            'cast_mapping': None  # Simplified - no casting for now
+        }
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index f1ade2c4e8..47a2d86d15 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -32,6 +32,7 @@ from pypaimon.read.scanner.starting_scanner import 
StartingScanner
 from pypaimon.read.split import Split
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
+from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
 
 
 class FullStartingScanner(StartingScanner):
@@ -62,6 +63,19 @@ class FullStartingScanner(StartingScanner):
             self.table.options.get('bucket', -1)) == 
BucketMode.POSTPONE_BUCKET.value else False
         self.data_evolution = 
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 
'true'
 
+        self._schema_cache = {}
+
+        def schema_fields_func(schema_id: int):
+            if schema_id not in self._schema_cache:
+                schema = self.table.schema_manager.read_schema(schema_id)
+                self._schema_cache[schema_id] = schema
+            return self._schema_cache[schema_id].fields if 
self._schema_cache[schema_id] else []
+
+        self.simple_stats_evolutions = SimpleStatsEvolutions(
+            schema_fields_func,
+            self.table.table_schema.id
+        )
+
     def scan(self) -> Plan:
         file_entries = self.plan_files()
         if not file_entries:
@@ -215,22 +229,35 @@ class FullStartingScanner(StartingScanner):
             return False
         if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
             return False
+
+        # Get SimpleStatsEvolution for this schema
+        evolution = 
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
+
+        # Apply evolution to stats
         if self.table.is_primary_key_table:
             predicate = self.primary_key_predicate
             stats = entry.file.key_stats
+            stats_fields = None
         else:
             predicate = self.predicate
             stats = entry.file.value_stats
+            if entry.file.value_stats_cols is None and entry.file.write_cols 
is not None:
+                stats_fields = entry.file.write_cols
+            else:
+                stats_fields = entry.file.value_stats_cols
         if not predicate:
             return True
-        return predicate.test_by_stats({
-            "min_values": stats.min_values.to_dict(),
-            "max_values": stats.max_values.to_dict(),
-            "null_counts": {
-                stats.min_values.fields[i].name: stats.null_counts[i] for i in 
range(len(stats.min_values.fields))
-            },
-            "row_count": entry.file.row_count
-        })
+        evolved_stats = evolution.evolution(
+            stats,
+            entry.file.row_count,
+            stats_fields
+        )
+
+        # Test predicate against evolved stats
+        return predicate.test_by_simple_stats(
+            evolved_stats,
+            entry.file.row_count
+        )
 
     def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
         partitioned_files = defaultdict(list)
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index 31297cc2b3..b9c4cdbddc 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -39,7 +39,7 @@ class SchemaManager:
                 return None
 
             max_version = max(versions)
-            return self._read_schema(max_version)
+            return self.read_schema(max_version)
         except Exception as e:
             raise RuntimeError(f"Failed to load schema from path: 
{self.schema_path}") from e
 
@@ -64,7 +64,7 @@ class SchemaManager:
     def _to_schema_path(self, schema_id: int) -> Path:
         return self.schema_path / f"{self.schema_prefix}{schema_id}"
 
-    def _read_schema(self, schema_id: int) -> Optional['TableSchema']:
+    def read_schema(self, schema_id: int) -> Optional['TableSchema']:
         schema_path = self._to_schema_path(schema_id)
         if not self.file_io.exists(schema_path):
             return None
diff --git a/paimon-python/pypaimon/table/row/binary_row.py 
b/paimon-python/pypaimon/table/row/binary_row.py
new file mode 100644
index 0000000000..f908935d44
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -0,0 +1,61 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from typing import Any, List
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class BinaryRow(InternalRow):
+    """
+    BinaryRow is a compact binary format for storing a row of data.
+    """
+
+    def __init__(self, data: bytes, fields: List[DataField]):
+        """
+        Initialize BinaryRow with raw binary data and field definitions.
+        """
+        self.data = data
+        self.arity = int.from_bytes(data[:4], 'big')
+        # Skip the arity prefix (4 bytes) if present
+        self.actual_data = data[4:] if len(data) >= 4 else data
+        self.fields = fields
+        self.row_kind = RowKind(self.actual_data[0])
+
+    def get_field(self, index: int) -> Any:
+        from pypaimon.table.row.generic_row import GenericRowDeserializer
+        """Get field value by index."""
+        if index >= self.arity or index < 0:
+            raise IndexError(f"Field index {index} out of range [0, 
{self.arity})")
+
+        if GenericRowDeserializer.is_null_at(self.actual_data, 0, index):
+            return None
+
+        return GenericRowDeserializer.parse_field_value(self.actual_data, 0,
+                                                        
GenericRowDeserializer.calculate_bit_set_width_in_bytes(
+                                                            self.arity),
+                                                        index, 
self.fields[index].type)
+
+    def get_row_kind(self) -> RowKind:
+        return self.row_kind
+
+    def is_null_at(self, pos: int) -> bool:
+        return self.get_field(pos) is None
+
+    def __len__(self):
+        return self.arity
diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index b409d3f2eb..13e3742110 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -17,12 +17,14 @@
 
################################################################################
 
 import struct
-from dataclasses import dataclass
 from datetime import date, datetime, time, timedelta
 from decimal import Decimal
-from typing import Any, List
+from typing import Any, List, Union
+
+from dataclasses import dataclass
 
 from pypaimon.schema.data_types import AtomicType, DataField, DataType
+from pypaimon.table.row.binary_row import BinaryRow
 from pypaimon.table.row.internal_row import InternalRow, RowKind
 from pypaimon.table.row.blob import BlobData
 
@@ -38,7 +40,7 @@ class GenericRow(InternalRow):
     def to_dict(self):
         return {self.fields[i].name: self.values[i] for i in 
range(len(self.fields))}
 
-    def get_field(self, pos: int):
+    def get_field(self, pos: int) -> Any:
         if pos >= len(self.values):
             raise IndexError(f"Position {pos} is out of bounds for row arity 
{len(self.values)}")
         return self.values[pos]
@@ -74,28 +76,28 @@ class GenericRowDeserializer:
             actual_data = bytes_data[4:]
 
         fields = []
-        null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
+        null_bits_size_in_bytes = cls.calculate_bit_set_width_in_bytes(arity)
         for i, data_field in enumerate(data_fields):
             value = None
-            if not cls._is_null_at(actual_data, 0, i):
-                value = cls._parse_field_value(actual_data, 0, 
null_bits_size_in_bytes, i, data_field.type)
+            if not cls.is_null_at(actual_data, 0, i):
+                value = cls.parse_field_value(actual_data, 0, 
null_bits_size_in_bytes, i, data_field.type)
             fields.append(value)
 
         return GenericRow(fields, data_fields, RowKind(actual_data[0]))
 
     @classmethod
-    def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
+    def calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
         return ((arity + 63 + cls.HEADER_SIZE_IN_BITS) // 64) * 8
 
     @classmethod
-    def _is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool:
+    def is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool:
         index = pos + cls.HEADER_SIZE_IN_BITS
         byte_index = offset + (index // 8)
         bit_index = index % 8
         return (bytes_data[byte_index] & (1 << bit_index)) != 0
 
     @classmethod
-    def _parse_field_value(
+    def parse_field_value(
             cls,
             bytes_data: bytes,
             base_offset: int,
@@ -264,17 +266,19 @@ class GenericRowSerializer:
     MAX_FIX_PART_DATA_SIZE = 7
 
     @classmethod
-    def to_bytes(cls, binary_row: GenericRow) -> bytes:
-        arity = len(binary_row.fields)
+    def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes:
+        if isinstance(row, BinaryRow):
+            return row.data
+        arity = len(row.fields)
         null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
         fixed_part_size = null_bits_size_in_bytes + arity * 8
         fixed_part = bytearray(fixed_part_size)
-        fixed_part[0] = binary_row.row_kind.value
+        fixed_part[0] = row.row_kind.value
 
         variable_part_data = []
         current_variable_offset = 0
 
-        for i, (value, field) in enumerate(zip(binary_row.values, 
binary_row.fields)):
+        for i, (value, field) in enumerate(zip(row.values, row.fields)):
             field_fixed_offset = null_bits_size_in_bytes + i * 8
 
             if value is None:
diff --git a/paimon-python/pypaimon/table/row/projected_row.py 
b/paimon-python/pypaimon/table/row/projected_row.py
new file mode 100644
index 0000000000..502338a605
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/projected_row.py
@@ -0,0 +1,84 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Any, List
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class ProjectedRow(InternalRow):
+    """
+    An implementation of InternalRow which provides a projected view of the 
underlying InternalRow.
+    Projection includes both reducing the accessible fields and reordering 
them.
+    Note: This class supports only top-level projections, not nested 
projections.
+    """
+
+    def __init__(self, index_mapping: List[int]):
+        """
+        Initialize ProjectedRow with index mapping.
+        Args:
+            index_mapping: Array representing the mapping of fields. For 
example,
+            [0, 2, 1] specifies to include in the following order the 1st 
field, the 3rd field
+            and the 2nd field of the row.
+        """
+        self.index_mapping = index_mapping
+        self.row = None
+
+    def replace_row(self, row: InternalRow) -> 'ProjectedRow':
+        self.row = row
+        return self
+
+    def get_field(self, pos: int) -> Any:
+        """Returns the value at the given position."""
+        if self.index_mapping[pos] < 0:
+            # TODO move this logical to hive
+            return None
+        return self.row.get_field(self.index_mapping[pos])
+
+    def is_null_at(self, pos: int) -> bool:
+        """Returns true if the element is null at the given position."""
+        if self.index_mapping[pos] < 0:
+            # TODO move this logical to hive
+            return True
+        return self.row.is_null_at(self.index_mapping[pos])
+
+    def get_row_kind(self) -> RowKind:
+        """Returns the kind of change that this row describes in a 
changelog."""
+        return self.row.get_row_kind()
+
+    def __len__(self) -> int:
+        """Returns the number of fields in this row."""
+        return len(self.row)
+
+    def __str__(self) -> str:
+        """String representation of the projected row."""
+        return (f"{self.row.get_row_kind().name if self.row else 'None'}"
+                f"{{index_mapping={self.index_mapping}, row={self.row}}}")
+
+    @staticmethod
+    def from_index_mapping(projection: List[int]) -> 'ProjectedRow':
+        """
+        Create an empty ProjectedRow starting from a projection array.
+        Args:
+            projection: Array representing the mapping of fields. For example,
+                       [0, 2, 1] specifies to include in the following order
+                       the 1st field, the 3rd field and the 2nd field of the 
row.
+        Returns:
+            ProjectedRow instance
+        """
+        return ProjectedRow(projection)
diff --git a/paimon-python/pypaimon/tests/binary_row_test.py 
b/paimon-python/pypaimon/tests/binary_row_test.py
new file mode 100644
index 0000000000..5bfabfb121
--- /dev/null
+++ b/paimon-python/pypaimon/tests/binary_row_test.py
@@ -0,0 +1,334 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import random
+import tempfile
+import unittest
+from typing import List
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
+
+
+def _random_format():
+    return random.choice(['parquet', 'avro', 'orc'])
+
+
+class BinaryRowTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', False)
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string()),
+            ('f2', pa.int64()),
+        ])
+        cls.catalog.create_table('default.test_append', 
Schema.from_pyarrow_schema(
+            pa_schema, partition_keys=['f0'], options={'file.format': 
_random_format()}), False)
+        cls.catalog.create_table('default.test_pk', Schema.from_pyarrow_schema(
+            pa_schema, partition_keys=['f2'], primary_keys=['f0'],
+            options={'bucket': '1', 'file.format': _random_format()}), False)
+        cls.data = pa.Table.from_pydict({
+            'f0': [1, 2, 3, 4, 5],
+            'f1': ['abc', 'abbc', 'bc', 'd', None],
+            'f2': [6, 7, 8, 9, 10],
+        }, schema=pa_schema)
+
+        append_table = cls.catalog.get_table('default.test_append')
+        write_builder = append_table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_arrow(cls.data)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        pk_table = cls.catalog.get_table('default.test_pk')
+        write_builder = pk_table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_arrow(cls.data)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+    def test_not_equal_append(self):
+        table = self.catalog.get_table('default.test_append')
+        self._overwrite_manifest_entry(table)
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.not_equal('f2', 6)  # test stats filter 
when filtering ManifestEntry
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(1, 4)
+        self.assertEqual(expected, actual)
+        self.assertEqual(len(expected), len(splits))
+
+    def test_less_than_append(self):
+        table = self.catalog.get_table('default.test_append')
+
+        self._overwrite_manifest_entry(table)
+
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.less_than('f2', 8)
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(0, 2)
+        self.assertEqual(actual, expected)
+        self.assertEqual(len(expected), len(splits))  # test stats filter when 
filtering ManifestEntry
+
+    def test_is_null_append(self):
+        table = self.catalog.get_table('default.test_append')
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.is_null('f1')  # value_stats_cols=None
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(4, 1)
+        self.assertEqual(expected, actual)
+        self.assertEqual(len(expected), len(splits))
+
+    def test_is_not_null_append(self):
+        table = self.catalog.get_table('default.test_append')
+        starting_scanner = FullStartingScanner(table, None, None)
+        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        self._transform_manifest_entries(manifest_entries, [])
+        l = ['abc', 'abbc', 'bc', 'd', None]
+        for i, entry in enumerate(manifest_entries):
+            entry.file.value_stats_cols = ['f1']
+            entry.file.value_stats = SimpleStats(
+                GenericRow([l[i]], [table.fields[1]]),
+                GenericRow([l[i]], [table.fields[1]]),
+                [1 if l[i] is None else 0],
+            )
+        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
+
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.is_not_null('f1')
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(0, 4)
+        self.assertEqual(expected, actual)
+        self.assertEqual(len(expected), len(splits))
+
+    def test_is_in_append(self):
+        table = self.catalog.get_table('default.test_append')
+        self._overwrite_manifest_entry(table)
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.is_in('f2', [6, 8])
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.take([0, 2])
+        self.assertEqual(expected, actual)
+        self.assertEqual(len(expected), len(splits))
+
+    def test_equal_pk(self):
+        table = self.catalog.get_table('default.test_pk')
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.equal('f2', 6)
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(0, 1)
+        self.assertEqual(expected, actual)
+        self.assertEqual(len(splits), len(expected))  # test partition filter 
when filtering ManifestEntry
+
+    def test_not_equal_pk(self):
+        table = self.catalog.get_table('default.test_pk')
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.not_equal('f2', 6)
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(1, 4)
+        self.assertEqual(actual, expected)
+        self.assertEqual(len(splits), len(expected))  # test partition filter 
when filtering ManifestEntry
+
+    def test_less_than_pk(self):
+        table = self.catalog.get_table('default.test_pk')
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.less_than('f0', 3)
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(0, 2)
+        self.assertEqual(expected, actual)
+        self.assertEqual(len(expected), len(splits))  # test key stats filter 
when filtering ManifestEntry
+
+    def test_is_null_pk(self):
+        table = self.catalog.get_table('default.test_pk')
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.is_null('f1')
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(4, 1)
+        self.assertEqual(actual, expected)
+
+    def test_is_not_null_pk(self):
+        table = self.catalog.get_table('default.test_pk')
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.is_not_null('f1')
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        expected = self.data.slice(0, 4)
+        self.assertEqual(actual, expected)
+
+    def test_is_in_pk(self):
+        table = self.catalog.get_table('default.test_pk')
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.is_in('f0', [1, 5])
+        splits, actual = self._read_result(read_builder.with_filter(predicate))
+        # expected rows: indices [0, 3]
+        expected = self.data.take([0, 4])
+        self.assertEqual(actual, expected)
+        self.assertEqual(len(splits), len(expected))  # test key stats filter 
when filtering ManifestEntry
+
+    def test_append_multi_cols(self):
+        # Create a 10-column append table and write 10 rows
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string()),
+            ('f2', pa.int64()),
+            ('f3', pa.string()),
+            ('f4', pa.int64()),
+            ('f5', pa.string()),
+            ('f6', pa.int64()),
+            ('f7', pa.string()),
+            ('f8', pa.int64()),
+            ('f9', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=['f0'],
+            options={'file.format': _random_format()}
+        )
+        self.catalog.create_table('default.test_append_10cols', schema, False)
+        table = self.catalog.get_table('default.test_append_10cols')
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        data = {
+            'f0': list(range(1, 11)),  # 0..9
+            'f1': ['a0', 'bb', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 
'a9'],  # contains 'bb' at index 1
+            'f2': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
+            'f3': ['x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9'],
+            'f4': [0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
+            'f5': ['y0', 'y1', 'y2', 'y3', 'y4', 'y5', 'y6', 'y7', 'y8', 'y9'],
+            'f6': [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
+            'f7': ['z0', 'z1', 'z2', 'z3', 'z4', 'z5', 'z6', 'z7', 'z8', 'z9'],
+            'f8': [5, 4, 3, 2, 1, 0, -1, -2, -3, -4],
+            'f9': ['w0', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6', 'w7', 'w8', 'w9'],
+        }
+        pa_table = pa.Table.from_pydict(data, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        starting_scanner = FullStartingScanner(table, None, None)
+        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        self._transform_manifest_entries(manifest_entries, [])
+        for i, entry in enumerate(manifest_entries):
+            entry.file.value_stats_cols = ['f2', 'f6', 'f8']
+            entry.file.value_stats = SimpleStats(
+                GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i], 
[table.fields[2], table.fields[6], table.fields[8]]),
+                GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i], 
[table.fields[2], table.fields[6], table.fields[8]]),
+                [0, 0, 0],
+            )
+        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
+        # Build multiple predicates and combine them
+        read_builder = table.new_read_builder()
+        predicate_builder = read_builder.new_predicate_builder()
+        p_in = predicate_builder.is_in('f6', [100, 600, 1000])
+        p_contains = predicate_builder.less_or_equal('f8', -3)
+        p_not_null = predicate_builder.is_not_null('f2')
+        p_ge = predicate_builder.greater_or_equal('f2', 50)
+        p_or = predicate_builder.or_predicates([p_in, p_contains])
+        combined = predicate_builder.and_predicates([p_or, p_not_null, p_ge])
+
+        splits, actual = self._read_result(read_builder.with_filter(combined))
+
+        # Expected rows after filter: indices 5 and 9
+        expected_data = {'f0': [6, 9, 10],
+                         'f1': ['a5', 'a8', 'a9'],
+                         'f2': [60, 90, 100],
+                         'f3': ['x5', 'x8', 'x9'],
+                         'f4': [1, 0, 1],
+                         'f5': ['y5', 'y8', 'y9'],
+                         'f6': [600, 900, 1000],
+                         'f7': ['z5', 'z8', 'z9'],
+                         'f8': [0, -3, -4],
+                         'f9': ['w5', 'w8', 'w9']
+                         }
+        self.assertEqual(expected_data, actual.to_pydict())
+
+        starting_scanner = FullStartingScanner(table, None, None)
+        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        self._transform_manifest_entries(manifest_entries, [])
+        for i, entry in enumerate(manifest_entries):
+            entry.file.value_stats_cols = ['f2', 'f6', 'f8']
+            entry.file.value_stats = SimpleStats(
+                GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2], 
table.fields[6], table.fields[8]]),
+                GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2], 
table.fields[6], table.fields[8]]),
+                [0, 0, 0],
+            )
+        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
+        splits, actual = self._read_result(read_builder.with_filter(combined))
+        self.assertFalse(actual)
+
+    def _read_result(self, read_builder):
+        scan = read_builder.new_scan()
+        read = read_builder.new_read()
+        splits = scan.plan().splits()
+        actual = read.to_arrow(splits)
+        return splits, actual
+
+    def _transform_manifest_entries(self, manifest_entries: 
List[ManifestEntry], trimmed_pk_fields):
+        for entry in manifest_entries:
+            entry.file.key_stats.min_values = 
GenericRowDeserializer.from_bytes(entry.file.key_stats.min_values.data,
+                                                                               
 trimmed_pk_fields)
+            entry.file.key_stats.max_values = 
GenericRowDeserializer.from_bytes(entry.file.key_stats.max_values.data,
+                                                                               
 trimmed_pk_fields)
+
+    def _overwrite_manifest_entry(self, table):
+        starting_scanner = FullStartingScanner(table, None, None)
+        latest_snapshot = 
starting_scanner.snapshot_manager.get_latest_snapshot()
+        manifest_files = 
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+        self._transform_manifest_entries(manifest_entries, [])
+        for i, entry in enumerate(manifest_entries):
+            entry.file.value_stats_cols = ['f2']
+            entry.file.value_stats = SimpleStats(
+                GenericRow([6 + i], [table.fields[2]]),
+                GenericRow([6 + i], [table.fields[2]]),
+                [0],
+            )
+        
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, 
manifest_entries)
diff --git a/paimon-python/pypaimon/tests/predicates_test.py 
b/paimon-python/pypaimon/tests/predicates_test.py
index 6158d1d88b..561641589f 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -24,6 +24,7 @@ import pandas as pd
 import pyarrow as pa
 
 from pypaimon import CatalogFactory, Schema
+from pypaimon.table.row.generic_row import GenericRowDeserializer
 
 
 def _check_filtered_result(read_builder, expected_df):
@@ -454,20 +455,26 @@ class PredicateTest(unittest.TestCase):
             if split.partition.values == ["p1", 2]:
                 count += 1
                 self.assertEqual(len(split.files), 1)
-                min_values = split.files[0].key_stats.min_values.to_dict()
-                max_values = split.files[0].key_stats.max_values.to_dict()
+                min_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
+                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                max_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
+                                                               
table.table_schema.get_primary_key_fields()).to_dict()
                 self.assertTrue(min_values["key1"] == 1 and min_values["key2"] 
== "e"
                                 and max_values["key1"] == 4 and 
max_values["key2"] == "h")
             elif split.partition.values == ["p2", 2]:
                 count += 1
-                min_values = split.files[0].key_stats.min_values.to_dict()
-                max_values = split.files[0].key_stats.max_values.to_dict()
+                min_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
+                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                max_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
+                                                               
table.table_schema.get_primary_key_fields()).to_dict()
                 self.assertTrue(min_values["key1"] == 5 and min_values["key2"] 
== "a"
                                 and max_values["key1"] == 8 and 
max_values["key2"] == "d")
             elif split.partition.values == ["p1", 1]:
                 count += 1
-                min_values = split.files[0].key_stats.min_values.to_dict()
-                max_values = split.files[0].key_stats.max_values.to_dict()
+                min_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
+                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                max_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
+                                                               
table.table_schema.get_primary_key_fields()).to_dict()
                 self.assertTrue(min_values["key1"] == max_values["key1"] == 7
                                 and max_values["key2"] == max_values["key2"] 
== "b")
         self.assertEqual(count, 3)
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 9be66d9759..6e6d57f963 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -183,8 +183,10 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
             manifest_files[0].file_name,
             lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row),
             drop_stats=False)
-        min_value_stats = 
manifest_entries[0].file.value_stats.min_values.values
-        max_value_stats = 
manifest_entries[0].file.value_stats.max_values.values
+        min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+                                                            
table.fields).values
+        max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+                                                            
table.fields).values
         expected_min_values = [col[0].as_py() for col in expect_data]
         expected_max_values = [col[1].as_py() for col in expect_data]
         self.assertEqual(min_value_stats, expected_min_values)
@@ -865,23 +867,27 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         # Verify value_stats structure based on the logic
         if value_stats_cols is None:
             # Should use all table fields - verify we have data for all fields
-            
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 
expected_fields_count)
-            
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 
expected_fields_count)
             self.assertEqual(len(read_entry.file.value_stats.null_counts), 
expected_fields_count)
         elif not value_stats_cols:  # Empty list
             # Should use empty fields - verify we have no field data
-            
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0)
-            
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 0)
+            self.assertEqual(read_entry.file.value_stats.max_values.arity, 0)
             self.assertEqual(len(read_entry.file.value_stats.null_counts), 0)
         else:
             # Should use specified fields - verify we have data for specified 
fields only
-            
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 
expected_fields_count)
-            
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.max_values.arity, 
expected_fields_count)
             self.assertEqual(len(read_entry.file.value_stats.null_counts), 
expected_fields_count)
 
         # Verify the actual values match what we expect
         if expected_fields_count > 0:
-            self.assertEqual(read_entry.file.value_stats.min_values.values, 
min_values)
-            self.assertEqual(read_entry.file.value_stats.max_values.values, 
max_values)
+            self.assertEqual(
+                
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.min_values.data, 
test_fields).values,
+                min_values)
+            self.assertEqual(
+                
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.max_values.data, 
test_fields).values,
+                max_values)
 
         self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index a06e120e95..ebe98e4fc5 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -37,7 +37,7 @@ from pypaimon.schema.data_types import (ArrayType, 
AtomicType, DataField,
                                         MapType, PyarrowFieldParser)
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
-from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
 from pypaimon.write.file_store_commit import FileStoreCommit
 
 
@@ -226,8 +226,10 @@ class ReaderBasicTest(unittest.TestCase):
         manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
             manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row), False)
-        min_value_stats = 
manifest_entries[0].file.value_stats.min_values.values
-        max_value_stats = 
manifest_entries[0].file.value_stats.max_values.values
+        min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+                                                            
table.fields).values
+        max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+                                                            
table.fields).values
         expected_min_values = [col[0].as_py() for col in expect_data]
         expected_max_values = [col[1].as_py() for col in expect_data]
         self.assertEqual(min_value_stats, expected_min_values)
@@ -649,23 +651,27 @@ class ReaderBasicTest(unittest.TestCase):
         # Verify value_stats structure based on the logic
         if value_stats_cols is None:
             # Should use all table fields - verify we have data for all fields
-            
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 
expected_fields_count)
-            
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 
expected_fields_count)
             self.assertEqual(len(read_entry.file.value_stats.null_counts), 
expected_fields_count)
         elif not value_stats_cols:  # Empty list
             # Should use empty fields - verify we have no field data
-            
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0)
-            
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 0)
+            self.assertEqual(read_entry.file.value_stats.max_values.arity, 0)
             self.assertEqual(len(read_entry.file.value_stats.null_counts), 0)
         else:
             # Should use specified fields - verify we have data for specified 
fields only
-            
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 
expected_fields_count)
-            
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.min_values.arity, 
expected_fields_count)
+            self.assertEqual(read_entry.file.value_stats.max_values.arity, 
expected_fields_count)
             self.assertEqual(len(read_entry.file.value_stats.null_counts), 
expected_fields_count)
 
         # Verify the actual values match what we expect
         if expected_fields_count > 0:
-            self.assertEqual(read_entry.file.value_stats.min_values.values, 
min_values)
-            self.assertEqual(read_entry.file.value_stats.max_values.values, 
max_values)
+            self.assertEqual(
+                
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.min_values.data, 
test_fields).values,
+                min_values)
+            self.assertEqual(
+                
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.max_values.data, 
test_fields).values,
+                max_values)
 
         self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)

Reply via email to