This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e2591d1417 [python] light refactor for stats collect (#6941)
e2591d1417 is described below

commit e2591d14174a17c63c737b578eb85c44aaf4224b
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Jan 4 15:25:10 2026 +0800

    [python] light refactor for stats collect (#6941)
---
 paimon-python/pypaimon/tests/reader_base_test.py   | 190 ++++++++++++++++++++-
 .../pypaimon/write/writer/data_blob_writer.py      |  19 +--
 paimon-python/pypaimon/write/writer/data_writer.py |  57 +++++--
 3 files changed, 226 insertions(+), 40 deletions(-)

diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 81a7a5baf9..c6223dadcb 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -546,6 +546,84 @@ class ReaderBasicTest(unittest.TestCase):
                 self.assertFalse(is_system_field,
                                  f"value_stats_cols should not contain system 
field: {field_name}")
 
+    def test_value_stats_empty_when_stats_disabled(self):
+        catalog = CatalogFactory.create({
+            "warehouse": self.warehouse
+        })
+        catalog.create_database("test_db_stats_disabled", True)
+
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('price', pa.float64()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=['id'],
+            options={'metadata.stats-mode': 'none', 'bucket': '2'}  # Stats 
disabled
+        )
+        catalog.create_table("test_db_stats_disabled.test_stats_disabled", 
schema, False)
+        table = catalog.get_table("test_db_stats_disabled.test_stats_disabled")
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'price': [10.5, 20.3, 30.7],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
+            manifest_files[0].file_name,
+            lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row),
+            False
+        )
+
+        self.assertGreater(len(manifest_entries), 0, "Should have at least one 
manifest entry")
+        file_meta = manifest_entries[0].file
+
+        self.assertEqual(
+            file_meta.value_stats_cols, [],
+            "value_stats_cols should be empty list [] when stats are disabled"
+        )
+
+        self.assertEqual(
+            file_meta.value_stats.min_values.arity, 0,
+            "value_stats.min_values should be empty (arity=0) when stats are 
disabled"
+        )
+        self.assertEqual(
+            file_meta.value_stats.max_values.arity, 0,
+            "value_stats.max_values should be empty (arity=0) when stats are 
disabled"
+        )
+        self.assertEqual(
+            len(file_meta.value_stats.null_counts), 0,
+            "value_stats.null_counts should be empty when stats are disabled"
+        )
+
+        empty_stats = SimpleStats.empty_stats()
+        self.assertEqual(
+            file_meta.value_stats.min_values.arity, 
len(empty_stats.min_values),
+            "value_stats.min_values should be empty (same as 
SimpleStats.empty_stats()) when stats are disabled"
+        )
+        self.assertEqual(
+            file_meta.value_stats.max_values.arity, 
len(empty_stats.max_values),
+            "value_stats.max_values should be empty (same as 
SimpleStats.empty_stats()) when stats are disabled"
+        )
+        self.assertEqual(
+            len(file_meta.value_stats.null_counts), 
len(empty_stats.null_counts),
+            "value_stats.null_counts should be empty (same as 
SimpleStats.empty_stats()) when stats are disabled"
+        )
+
     def test_types(self):
         data_fields = [
             DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
@@ -776,12 +854,8 @@ class ReaderBasicTest(unittest.TestCase):
         self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
 
     def _test_append_only_schema_match_case(self, table, pa_schema):
-        """Test that for append-only tables, data.schema matches table.fields.
+        from pypaimon.schema.data_types import PyarrowFieldParser
 
-        This verifies the assumption in data_writer.py that for append-only 
tables,
-        PyarrowFieldParser.to_paimon_schema(data.schema) should have the same 
fields
-        as self.table.fields (same count and same field names).
-        """
         self.assertFalse(table.is_primary_key_table,
                          "Table should be append-only (no primary keys)")
 
@@ -789,7 +863,46 @@ class ReaderBasicTest(unittest.TestCase):
             'id': [1, 2, 3],
             'name': ['Alice', 'Bob', 'Charlie'],
             'price': [10.5, 20.3, 30.7],
-            'category': ['A', 'B', 'C'],
+            'category': ['A', 'B', 'C']
+        }, schema=pa_schema)
+
+        data_fields_from_schema = 
PyarrowFieldParser.to_paimon_schema(test_data.schema)
+        table_fields = table.fields
+
+        self.assertEqual(
+            len(data_fields_from_schema), len(table_fields),
+            f"Field count mismatch: data.schema has 
{len(data_fields_from_schema)} fields, "
+            f"but table.fields has {len(table_fields)} fields"
+        )
+
+        data_field_names = {field.name for field in data_fields_from_schema}
+        table_field_names = {field.name for field in table_fields}
+        self.assertEqual(
+            data_field_names, table_field_names,
+            f"Field names mismatch: data.schema has {data_field_names}, "
+            f"but table.fields has {table_field_names}"
+        )
+
+    def test_primary_key_value_stats(self):
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('price', pa.float64()),
+            ('category', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=['id'],
+            options={'metadata.stats-mode': 'full', 'bucket': '2'}
+        )
+        self.catalog.create_table('default.test_pk_value_stats', schema, False)
+        table = self.catalog.get_table('default.test_pk_value_stats')
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+            'price': [10.5, 20.3, 30.7, 40.1, 50.9],
+            'category': ['A', 'B', 'C', 'D', 'E']
         }, schema=pa_schema)
 
         write_builder = table.new_batch_write_builder()
@@ -831,6 +944,71 @@ class ReaderBasicTest(unittest.TestCase):
             file_meta = manifest_entries[0].file
             self.assertIsNone(file_meta.value_stats_cols,
                               "value_stats_cols should be None when all table 
fields are included")
+        self.assertGreater(len(manifest_entries), 0, "Should have at least one 
manifest entry")
+        file_meta = manifest_entries[0].file
+
+        key_stats = file_meta.key_stats
+        self.assertIsNotNone(key_stats, "key_stats should not be None")
+        self.assertGreater(key_stats.min_values.arity, 0, "key_stats should 
contain key fields")
+        self.assertEqual(key_stats.min_values.arity, 1, "key_stats should 
contain exactly 1 key field (id)")
+
+        value_stats = file_meta.value_stats
+        self.assertIsNotNone(value_stats, "value_stats should not be None")
+        
+        if file_meta.value_stats_cols is None:
+            expected_value_fields = ['name', 'price', 'category']
+            self.assertGreaterEqual(value_stats.min_values.arity, 
len(expected_value_fields),
+                                    f"value_stats should contain at least 
{len(expected_value_fields)} value fields")
+        else:
+            self.assertNotIn('id', file_meta.value_stats_cols,
+                             "Key field 'id' should NOT be in 
value_stats_cols")
+            
+            expected_value_fields = ['name', 'price', 'category']
+            
self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)),
+                            f"value_stats_cols should contain value fields: 
{expected_value_fields}, "
+                            f"but got: {file_meta.value_stats_cols}")
+            
+            expected_arity = len(file_meta.value_stats_cols)
+            self.assertEqual(value_stats.min_values.arity, expected_arity,
+                             f"value_stats should contain {expected_arity} 
fields (matching value_stats_cols), "
+                             f"but got {value_stats.min_values.arity}")
+            self.assertEqual(value_stats.max_values.arity, expected_arity,
+                             f"value_stats should contain {expected_arity} 
fields (matching value_stats_cols), "
+                             f"but got {value_stats.max_values.arity}")
+            self.assertEqual(len(value_stats.null_counts), expected_arity,
+                             f"value_stats null_counts should have 
{expected_arity} elements, "
+                             f"but got {len(value_stats.null_counts)}")
+            
+            self.assertEqual(value_stats.min_values.arity, 
len(file_meta.value_stats_cols),
+                             f"value_stats.min_values.arity 
({value_stats.min_values.arity}) must match "
+                             f"value_stats_cols length 
({len(file_meta.value_stats_cols)})")
+            
+            for field_name in file_meta.value_stats_cols:
+                is_system_field = (field_name.startswith('_KEY_') or
+                                   field_name in ['_SEQUENCE_NUMBER', 
'_VALUE_KIND', '_ROW_ID'])
+                self.assertFalse(is_system_field,
+                                 f"value_stats_cols should not contain system 
field: {field_name}")
+            
+            value_stats_fields = 
table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+                {'_VALUE_STATS_COLS': file_meta.value_stats_cols},
+                table.fields
+            )
+            min_value_stats = GenericRowDeserializer.from_bytes(
+                value_stats.min_values.data,
+                value_stats_fields
+            ).values
+            max_value_stats = GenericRowDeserializer.from_bytes(
+                value_stats.max_values.data,
+                value_stats_fields
+            ).values
+
+            self.assertEqual(len(min_value_stats), 3, "min_value_stats should 
have 3 values")
+            self.assertEqual(len(max_value_stats), 3, "max_value_stats should 
have 3 values")
+        
+        actual_data = 
read_builder.new_read().to_arrow(table_scan.plan().splits())
+        self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows")
+        actual_ids = sorted(actual_data.column('id').to_pylist())
+        self.assertEqual(actual_ids, [1, 2, 3, 4, 5], "All IDs should be 
present")
 
     def test_split_target_size(self):
         """Test source.split.target-size configuration effect on split 
generation."""
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index eaf2b9483c..8cdd7428dc 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -276,14 +276,7 @@ class DataBlobWriter(DataWriter):
         # Column stats (only for normal columns)
         metadata_stats_enabled = self.options.metadata_stats_enabled()
         stats_columns = self.normal_columns if metadata_stats_enabled else []
-        column_stats = {
-            field.name: self._get_column_stats(data, field.name)
-            for field in stats_columns
-        }
-
-        min_value_stats = [column_stats[field.name]['min_values'] for field in 
stats_columns]
-        max_value_stats = [column_stats[field.name]['max_values'] for field in 
stats_columns]
-        value_null_counts = [column_stats[field.name]['null_counts'] for field 
in stats_columns]
+        value_stats = self._collect_value_stats(data, stats_columns)
 
         self.sequence_generator.start = self.sequence_generator.current
 
@@ -293,14 +286,8 @@ class DataBlobWriter(DataWriter):
             row_count=data.num_rows,
             min_key=GenericRow([], []),
             max_key=GenericRow([], []),
-            key_stats=SimpleStats(
-                GenericRow([], []),
-                GenericRow([], []),
-                []),
-            value_stats=SimpleStats(
-                GenericRow(min_value_stats, stats_columns),
-                GenericRow(max_value_stats, stats_columns),
-                value_null_counts),
+            key_stats=SimpleStats.empty_stats(),
+            value_stats=value_stats,
             min_sequence_number=-1,
             max_sequence_number=-1,
             schema_id=self.table.table_schema.id,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 38dd58c15e..fa5f004b8e 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -201,17 +201,14 @@ class DataWriter(ABC):
             field.name: self._get_column_stats(data, field.name)
             for field in stats_fields
         }
-        data_fields = stats_fields if value_stats_enabled else []
-        min_value_stats = [column_stats[field.name]['min_values'] for field in 
data_fields]
-        max_value_stats = [column_stats[field.name]['max_values'] for field in 
data_fields]
-        value_null_counts = [column_stats[field.name]['null_counts'] for field 
in data_fields]
         key_fields = self.trimmed_primary_keys_fields
-        min_key_stats = [column_stats[field.name]['min_values'] for field in 
key_fields]
-        max_key_stats = [column_stats[field.name]['max_values'] for field in 
key_fields]
-        key_null_counts = [column_stats[field.name]['null_counts'] for field 
in key_fields]
-        if not all(count == 0 for count in key_null_counts):
+        key_stats = self._collect_value_stats(data, key_fields, column_stats)
+        if not all(count == 0 for count in key_stats.null_counts):
             raise RuntimeError("Primary key should not be null")
 
+        value_fields = stats_fields if value_stats_enabled else []
+        value_stats = self._collect_value_stats(data, value_fields, 
column_stats)
+
         min_seq = self.sequence_generator.start
         max_seq = self.sequence_generator.current
         self.sequence_generator.start = self.sequence_generator.current
@@ -221,16 +218,8 @@ class DataWriter(ABC):
             row_count=data.num_rows,
             min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
             max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
-            key_stats=SimpleStats(
-                GenericRow(min_key_stats, self.trimmed_primary_keys_fields),
-                GenericRow(max_key_stats, self.trimmed_primary_keys_fields),
-                key_null_counts,
-            ),
-            value_stats=SimpleStats(
-                GenericRow(min_value_stats, data_fields),
-                GenericRow(max_value_stats, data_fields),
-                value_null_counts,
-            ),
+            key_stats=key_stats,
+            value_stats=value_stats,
             min_sequence_number=min_seq,
             max_sequence_number=max_seq,
             schema_id=self.table.table_schema.id,
@@ -278,6 +267,27 @@ class DataWriter(ABC):
 
         return best_split
 
+    def _collect_value_stats(self, data: pa.Table, fields: List,
+                             column_stats: Optional[Dict[str, Dict]] = None) 
-> SimpleStats:
+        if not fields:
+            return SimpleStats.empty_stats()
+        
+        if column_stats is None or not column_stats:
+            column_stats = {
+                field.name: self._get_column_stats(data, field.name)
+                for field in fields
+            }
+        
+        min_stats = [column_stats[field.name]['min_values'] for field in 
fields]
+        max_stats = [column_stats[field.name]['max_values'] for field in 
fields]
+        null_counts = [column_stats[field.name]['null_counts'] for field in 
fields]
+        
+        return SimpleStats(
+            GenericRow(min_stats, fields),
+            GenericRow(max_stats, fields),
+            null_counts
+        )
+
     @staticmethod
     def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> 
Dict:
         column_array = record_batch.column(column_name)
@@ -287,6 +297,17 @@ class DataWriter(ABC):
                 "max_values": None,
                 "null_counts": column_array.null_count,
             }
+        
+        column_type = column_array.type
+        supports_minmax = not (pa.types.is_nested(column_type) or 
pa.types.is_map(column_type))
+        
+        if not supports_minmax:
+            return {
+                "min_values": None,
+                "max_values": None,
+                "null_counts": column_array.null_count,
+            }
+        
         min_values = pc.min(column_array).as_py()
         max_values = pc.max(column_array).as_py()
         null_counts = column_array.null_count

Reply via email to