This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e2591d1417 [python] light refactor for stats collect (#6941)
e2591d1417 is described below
commit e2591d14174a17c63c737b578eb85c44aaf4224b
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Jan 4 15:25:10 2026 +0800
[python] light refactor for stats collect (#6941)
---
paimon-python/pypaimon/tests/reader_base_test.py | 190 ++++++++++++++++++++-
.../pypaimon/write/writer/data_blob_writer.py | 19 +--
paimon-python/pypaimon/write/writer/data_writer.py | 57 +++++--
3 files changed, 226 insertions(+), 40 deletions(-)
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index 81a7a5baf9..c6223dadcb 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -546,6 +546,84 @@ class ReaderBasicTest(unittest.TestCase):
self.assertFalse(is_system_field,
f"value_stats_cols should not contain system
field: {field_name}")
+ def test_value_stats_empty_when_stats_disabled(self):
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_db_stats_disabled", True)
+
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('name', pa.string()),
+ ('price', pa.float64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['id'],
+ options={'metadata.stats-mode': 'none', 'bucket': '2'} # Stats
disabled
+ )
+ catalog.create_table("test_db_stats_disabled.test_stats_disabled",
schema, False)
+ table = catalog.get_table("test_db_stats_disabled.test_stats_disabled")
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie'],
+ 'price': [10.5, 20.3, 30.7],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
+ manifest_files[0].file_name,
+ lambda row:
table_scan.starting_scanner._filter_manifest_entry(row),
+ False
+ )
+
+ self.assertGreater(len(manifest_entries), 0, "Should have at least one
manifest entry")
+ file_meta = manifest_entries[0].file
+
+ self.assertEqual(
+ file_meta.value_stats_cols, [],
+ "value_stats_cols should be empty list [] when stats are disabled"
+ )
+
+ self.assertEqual(
+ file_meta.value_stats.min_values.arity, 0,
+ "value_stats.min_values should be empty (arity=0) when stats are
disabled"
+ )
+ self.assertEqual(
+ file_meta.value_stats.max_values.arity, 0,
+ "value_stats.max_values should be empty (arity=0) when stats are
disabled"
+ )
+ self.assertEqual(
+ len(file_meta.value_stats.null_counts), 0,
+ "value_stats.null_counts should be empty when stats are disabled"
+ )
+
+ empty_stats = SimpleStats.empty_stats()
+ self.assertEqual(
+ file_meta.value_stats.min_values.arity,
len(empty_stats.min_values),
+ "value_stats.min_values should be empty (same as
SimpleStats.empty_stats()) when stats are disabled"
+ )
+ self.assertEqual(
+ file_meta.value_stats.max_values.arity,
len(empty_stats.max_values),
+ "value_stats.max_values should be empty (same as
SimpleStats.empty_stats()) when stats are disabled"
+ )
+ self.assertEqual(
+ len(file_meta.value_stats.null_counts),
len(empty_stats.null_counts),
+ "value_stats.null_counts should be empty (same as
SimpleStats.empty_stats()) when stats are disabled"
+ )
+
def test_types(self):
data_fields = [
DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
@@ -776,12 +854,8 @@ class ReaderBasicTest(unittest.TestCase):
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
def _test_append_only_schema_match_case(self, table, pa_schema):
- """Test that for append-only tables, data.schema matches table.fields.
+ from pypaimon.schema.data_types import PyarrowFieldParser
- This verifies the assumption in data_writer.py that for append-only
tables,
- PyarrowFieldParser.to_paimon_schema(data.schema) should have the same
fields
- as self.table.fields (same count and same field names).
- """
self.assertFalse(table.is_primary_key_table,
"Table should be append-only (no primary keys)")
@@ -789,7 +863,46 @@ class ReaderBasicTest(unittest.TestCase):
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'price': [10.5, 20.3, 30.7],
- 'category': ['A', 'B', 'C'],
+ 'category': ['A', 'B', 'C']
+ }, schema=pa_schema)
+
+ data_fields_from_schema =
PyarrowFieldParser.to_paimon_schema(test_data.schema)
+ table_fields = table.fields
+
+ self.assertEqual(
+ len(data_fields_from_schema), len(table_fields),
+ f"Field count mismatch: data.schema has
{len(data_fields_from_schema)} fields, "
+ f"but table.fields has {len(table_fields)} fields"
+ )
+
+ data_field_names = {field.name for field in data_fields_from_schema}
+ table_field_names = {field.name for field in table_fields}
+ self.assertEqual(
+ data_field_names, table_field_names,
+ f"Field names mismatch: data.schema has {data_field_names}, "
+ f"but table.fields has {table_field_names}"
+ )
+
+ def test_primary_key_value_stats(self):
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('name', pa.string()),
+ ('price', pa.float64()),
+ ('category', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['id'],
+ options={'metadata.stats-mode': 'full', 'bucket': '2'}
+ )
+ self.catalog.create_table('default.test_pk_value_stats', schema, False)
+ table = self.catalog.get_table('default.test_pk_value_stats')
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+ 'price': [10.5, 20.3, 30.7, 40.1, 50.9],
+ 'category': ['A', 'B', 'C', 'D', 'E']
}, schema=pa_schema)
write_builder = table.new_batch_write_builder()
@@ -831,6 +944,71 @@ class ReaderBasicTest(unittest.TestCase):
file_meta = manifest_entries[0].file
self.assertIsNone(file_meta.value_stats_cols,
"value_stats_cols should be None when all table
fields are included")
+ self.assertGreater(len(manifest_entries), 0, "Should have at least one
manifest entry")
+ file_meta = manifest_entries[0].file
+
+ key_stats = file_meta.key_stats
+ self.assertIsNotNone(key_stats, "key_stats should not be None")
+ self.assertGreater(key_stats.min_values.arity, 0, "key_stats should
contain key fields")
+ self.assertEqual(key_stats.min_values.arity, 1, "key_stats should
contain exactly 1 key field (id)")
+
+ value_stats = file_meta.value_stats
+ self.assertIsNotNone(value_stats, "value_stats should not be None")
+
+ if file_meta.value_stats_cols is None:
+ expected_value_fields = ['name', 'price', 'category']
+ self.assertGreaterEqual(value_stats.min_values.arity,
len(expected_value_fields),
+ f"value_stats should contain at least
{len(expected_value_fields)} value fields")
+ else:
+ self.assertNotIn('id', file_meta.value_stats_cols,
+ "Key field 'id' should NOT be in
value_stats_cols")
+
+ expected_value_fields = ['name', 'price', 'category']
+
self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)),
+ f"value_stats_cols should contain value fields:
{expected_value_fields}, "
+ f"but got: {file_meta.value_stats_cols}")
+
+ expected_arity = len(file_meta.value_stats_cols)
+ self.assertEqual(value_stats.min_values.arity, expected_arity,
+ f"value_stats should contain {expected_arity}
fields (matching value_stats_cols), "
+ f"but got {value_stats.min_values.arity}")
+ self.assertEqual(value_stats.max_values.arity, expected_arity,
+ f"value_stats should contain {expected_arity}
fields (matching value_stats_cols), "
+ f"but got {value_stats.max_values.arity}")
+ self.assertEqual(len(value_stats.null_counts), expected_arity,
+ f"value_stats null_counts should have
{expected_arity} elements, "
+ f"but got {len(value_stats.null_counts)}")
+
+ self.assertEqual(value_stats.min_values.arity,
len(file_meta.value_stats_cols),
+ f"value_stats.min_values.arity
({value_stats.min_values.arity}) must match "
+ f"value_stats_cols length
({len(file_meta.value_stats_cols)})")
+
+ for field_name in file_meta.value_stats_cols:
+ is_system_field = (field_name.startswith('_KEY_') or
+ field_name in ['_SEQUENCE_NUMBER',
'_VALUE_KIND', '_ROW_ID'])
+ self.assertFalse(is_system_field,
+ f"value_stats_cols should not contain system
field: {field_name}")
+
+ value_stats_fields =
table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+ {'_VALUE_STATS_COLS': file_meta.value_stats_cols},
+ table.fields
+ )
+ min_value_stats = GenericRowDeserializer.from_bytes(
+ value_stats.min_values.data,
+ value_stats_fields
+ ).values
+ max_value_stats = GenericRowDeserializer.from_bytes(
+ value_stats.max_values.data,
+ value_stats_fields
+ ).values
+
+ self.assertEqual(len(min_value_stats), 3, "min_value_stats should
have 3 values")
+ self.assertEqual(len(max_value_stats), 3, "max_value_stats should
have 3 values")
+
+ actual_data =
read_builder.new_read().to_arrow(table_scan.plan().splits())
+ self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows")
+ actual_ids = sorted(actual_data.column('id').to_pylist())
+ self.assertEqual(actual_ids, [1, 2, 3, 4, 5], "All IDs should be
present")
def test_split_target_size(self):
"""Test source.split.target-size configuration effect on split
generation."""
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index eaf2b9483c..8cdd7428dc 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -276,14 +276,7 @@ class DataBlobWriter(DataWriter):
# Column stats (only for normal columns)
metadata_stats_enabled = self.options.metadata_stats_enabled()
stats_columns = self.normal_columns if metadata_stats_enabled else []
- column_stats = {
- field.name: self._get_column_stats(data, field.name)
- for field in stats_columns
- }
-
- min_value_stats = [column_stats[field.name]['min_values'] for field in
stats_columns]
- max_value_stats = [column_stats[field.name]['max_values'] for field in
stats_columns]
- value_null_counts = [column_stats[field.name]['null_counts'] for field
in stats_columns]
+ value_stats = self._collect_value_stats(data, stats_columns)
self.sequence_generator.start = self.sequence_generator.current
@@ -293,14 +286,8 @@ class DataBlobWriter(DataWriter):
row_count=data.num_rows,
min_key=GenericRow([], []),
max_key=GenericRow([], []),
- key_stats=SimpleStats(
- GenericRow([], []),
- GenericRow([], []),
- []),
- value_stats=SimpleStats(
- GenericRow(min_value_stats, stats_columns),
- GenericRow(max_value_stats, stats_columns),
- value_null_counts),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=value_stats,
min_sequence_number=-1,
max_sequence_number=-1,
schema_id=self.table.table_schema.id,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 38dd58c15e..fa5f004b8e 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -201,17 +201,14 @@ class DataWriter(ABC):
field.name: self._get_column_stats(data, field.name)
for field in stats_fields
}
- data_fields = stats_fields if value_stats_enabled else []
- min_value_stats = [column_stats[field.name]['min_values'] for field in
data_fields]
- max_value_stats = [column_stats[field.name]['max_values'] for field in
data_fields]
- value_null_counts = [column_stats[field.name]['null_counts'] for field
in data_fields]
key_fields = self.trimmed_primary_keys_fields
- min_key_stats = [column_stats[field.name]['min_values'] for field in
key_fields]
- max_key_stats = [column_stats[field.name]['max_values'] for field in
key_fields]
- key_null_counts = [column_stats[field.name]['null_counts'] for field
in key_fields]
- if not all(count == 0 for count in key_null_counts):
+ key_stats = self._collect_value_stats(data, key_fields, column_stats)
+ if not all(count == 0 for count in key_stats.null_counts):
raise RuntimeError("Primary key should not be null")
+ value_fields = stats_fields if value_stats_enabled else []
+ value_stats = self._collect_value_stats(data, value_fields,
column_stats)
+
min_seq = self.sequence_generator.start
max_seq = self.sequence_generator.current
self.sequence_generator.start = self.sequence_generator.current
@@ -221,16 +218,8 @@ class DataWriter(ABC):
row_count=data.num_rows,
min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
- key_stats=SimpleStats(
- GenericRow(min_key_stats, self.trimmed_primary_keys_fields),
- GenericRow(max_key_stats, self.trimmed_primary_keys_fields),
- key_null_counts,
- ),
- value_stats=SimpleStats(
- GenericRow(min_value_stats, data_fields),
- GenericRow(max_value_stats, data_fields),
- value_null_counts,
- ),
+ key_stats=key_stats,
+ value_stats=value_stats,
min_sequence_number=min_seq,
max_sequence_number=max_seq,
schema_id=self.table.table_schema.id,
@@ -278,6 +267,27 @@ class DataWriter(ABC):
return best_split
+ def _collect_value_stats(self, data: pa.Table, fields: List,
+ column_stats: Optional[Dict[str, Dict]] = None)
-> SimpleStats:
+ if not fields:
+ return SimpleStats.empty_stats()
+
+ if column_stats is None or not column_stats:
+ column_stats = {
+ field.name: self._get_column_stats(data, field.name)
+ for field in fields
+ }
+
+ min_stats = [column_stats[field.name]['min_values'] for field in
fields]
+ max_stats = [column_stats[field.name]['max_values'] for field in
fields]
+ null_counts = [column_stats[field.name]['null_counts'] for field in
fields]
+
+ return SimpleStats(
+ GenericRow(min_stats, fields),
+ GenericRow(max_stats, fields),
+ null_counts
+ )
+
@staticmethod
def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) ->
Dict:
column_array = record_batch.column(column_name)
@@ -287,6 +297,17 @@ class DataWriter(ABC):
"max_values": None,
"null_counts": column_array.null_count,
}
+
+ column_type = column_array.type
+ supports_minmax = not (pa.types.is_nested(column_type) or
pa.types.is_map(column_type))
+
+ if not supports_minmax:
+ return {
+ "min_values": None,
+ "max_values": None,
+ "null_counts": column_array.null_count,
+ }
+
min_values = pc.min(column_array).as_py()
max_values = pc.max(column_array).as_py()
null_counts = column_array.null_count