This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e08a4a78 [python] Let Python write file without value stats by 
default (#6940)
24e08a4a78 is described below

commit 24e08a4a78593622c8ad78e2f919580edddb394e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 31 22:00:38 2025 +0800

    [python] Let Python write file without value stats by default (#6940)
---
 .../pypaimon/common/options/core_options.py        | 10 +++++++
 .../pypaimon/manifest/schema/simple_stats.py       |  2 +-
 paimon-python/pypaimon/tests/predicates_test.py    |  2 +-
 .../pypaimon/tests/py36/ao_predicate_test.py       |  2 +-
 .../pypaimon/tests/py36/rest_ao_read_write_test.py | 35 +++++++++++++++-------
 paimon-python/pypaimon/tests/reader_base_test.py   | 33 ++++++++++++++------
 .../pypaimon/tests/schema_evolution_read_test.py   |  8 +++--
 .../pypaimon/write/writer/data_blob_writer.py      | 30 +++++++++----------
 paimon-python/pypaimon/write/writer/data_writer.py | 17 ++++++-----
 9 files changed, 91 insertions(+), 48 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 10ff120d75..4ab5a253d7 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -153,6 +153,13 @@ class CoreOptions:
         .with_description("Define the data block size.")
     )
 
+    METADATA_STATS_MODE: ConfigOption[str] = (
+        ConfigOptions.key("metadata.stats-mode")
+        .string_type()
+        .default_value("none")
+        .with_description("Stats Mode, Python by default is none. Java is 
truncate(16).")
+    )
+
     BLOB_AS_DESCRIPTOR: ConfigOption[bool] = (
         ConfigOptions.key("blob-as-descriptor")
         .boolean_type()
@@ -317,6 +324,9 @@ class CoreOptions:
     def file_block_size(self, default=None):
         return self.options.get(CoreOptions.FILE_BLOCK_SIZE, default)
 
+    def metadata_stats_enabled(self, default=None):
+        return self.options.get(CoreOptions.METADATA_STATS_MODE, default) == 
"full"
+
     def blob_as_descriptor(self, default=None):
         return self.options.get(CoreOptions.BLOB_AS_DESCRIPTOR, default)
 
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py 
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index 10d9e62420..fc16d351b7 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -37,7 +37,7 @@ class SimpleStats:
         if cls._empty_stats is None:
             min_values = GenericRow([], [])
             max_values = GenericRow([], [])
-            cls._empty_stats = cls(min_values, max_values, None)
+            cls._empty_stats = cls(min_values, max_values, [])
         return cls._empty_stats
 
 
diff --git a/paimon-python/pypaimon/tests/predicates_test.py 
b/paimon-python/pypaimon/tests/predicates_test.py
index 6e6de2fcae..f54a18dd93 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -52,7 +52,7 @@ class PredicateTest(unittest.TestCase):
             ('f1', pa.string()),
         ])
         cls.catalog.create_table('default.test_append', 
Schema.from_pyarrow_schema(
-            pa_schema, options={'file.format': _random_format()}), False)
+            pa_schema, options={'file.format': _random_format(), 
'metadata.stats-mode': 'full'}), False)
         cls.catalog.create_table('default.test_pk', Schema.from_pyarrow_schema(
             pa_schema, primary_keys=['f0'], options={'bucket': '1', 
'file.format': _random_format()}), False)
 
diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py 
b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
index b06a69baa8..a3b3e0ec9f 100644
--- a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
@@ -40,7 +40,7 @@ class AOPredicatePy36Test(unittest.TestCase):
             ('f1', pa.string()),
         ])
         cls.catalog.create_table('default.test_append', 
Schema.from_pyarrow_schema(
-            pa_schema, options={'file.format': _random_format()}), False)
+            pa_schema, options={'file.format': _random_format(), 
'metadata.stats-mode': 'full'}), False)
 
         df = pd.DataFrame({
             'f0': [1, 2, 3, 4, 5],
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index cb85de28b5..7f27b15b31 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -17,6 +17,7 @@ limitations under the License.
 """
 import logging
 import time
+import random
 from datetime import date
 from decimal import Decimal
 from unittest.mock import Mock
@@ -143,7 +144,9 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
             ('f10', pa.decimal128(10, 2)),
             ('f11', pa.date32()),
         ])
-        schema = Schema.from_pyarrow_schema(simple_pa_schema)
+        stats_enabled = random.random() < 0.5
+        options = {'metadata.stats-mode': 'full'} if stats_enabled else {}
+        schema = Schema.from_pyarrow_schema(simple_pa_schema, options=options)
         self.rest_catalog.create_table('default.test_full_data_types', schema, 
False)
         table = self.rest_catalog.get_table('default.test_full_data_types')
 
@@ -183,14 +186,25 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
             manifest_files[0].file_name,
             lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row),
             drop_stats=False)
-        min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
-                                                            
table.fields).values
-        max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
-                                                            
table.fields).values
-        expected_min_values = [col[0].as_py() for col in expect_data]
-        expected_max_values = [col[1].as_py() for col in expect_data]
-        self.assertEqual(min_value_stats, expected_min_values)
-        self.assertEqual(max_value_stats, expected_max_values)
+        # Python write does not produce value stats
+        if stats_enabled:
+            self.assertEqual(manifest_entries[0].file.value_stats_cols, None)
+            min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+                                                                
table.fields).values
+            max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+                                                                
table.fields).values
+            expected_min_values = [col[0].as_py() for col in expect_data]
+            expected_max_values = [col[1].as_py() for col in expect_data]
+            self.assertEqual(min_value_stats, expected_min_values)
+            self.assertEqual(max_value_stats, expected_max_values)
+        else:
+            self.assertEqual(manifest_entries[0].file.value_stats_cols, [])
+            min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+                                                                []).values
+            max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+                                                                []).values
+            self.assertEqual(min_value_stats, [])
+            self.assertEqual(max_value_stats, [])
 
     def test_mixed_add_and_delete_entries_same_partition(self):
         """Test record_count calculation with mixed ADD/DELETE entries in same 
partition."""
@@ -458,7 +472,8 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         self.assertEqual(result.to_dict(), test_df.to_dict())
 
     def test_append_only_reader_with_filter(self):
-        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        options = {'metadata.stats-mode': 'full'}
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options=options)
         self.rest_catalog.create_table('default.test_append_only_filter', 
schema, False)
         table = self.rest_catalog.get_table('default.test_append_only_filter')
         self._write_test_table(table)
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 1455d9ad31..92a275585c 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -21,6 +21,7 @@ import os
 import shutil
 import tempfile
 import unittest
+import random
 from datetime import date, datetime, time
 from decimal import Decimal
 from unittest.mock import Mock
@@ -178,7 +179,9 @@ class ReaderBasicTest(unittest.TestCase):
             ('f12', pa.date32()),
             ('f13', pa.time64('us')),
         ])
-        schema = Schema.from_pyarrow_schema(simple_pa_schema)
+        stats_enabled = random.random() < 0.5
+        options = {'metadata.stats-mode': 'full'} if stats_enabled else {}
+        schema = Schema.from_pyarrow_schema(simple_pa_schema, options=options)
         self.catalog.create_table('default.test_full_data_types', schema, 
False)
         table = self.catalog.get_table('default.test_full_data_types')
 
@@ -226,14 +229,26 @@ class ReaderBasicTest(unittest.TestCase):
         manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
         manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
             manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row), False)
-        min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
-                                                            
table.fields).values
-        max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
-                                                            
table.fields).values
-        expected_min_values = [col[0].as_py() for col in expect_data]
-        expected_max_values = [col[1].as_py() for col in expect_data]
-        self.assertEqual(min_value_stats, expected_min_values)
-        self.assertEqual(max_value_stats, expected_max_values)
+
+        # Python write does not produce value stats
+        if stats_enabled:
+            self.assertEqual(manifest_entries[0].file.value_stats_cols, None)
+            min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+                                                                
table.fields).values
+            max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+                                                                
table.fields).values
+            expected_min_values = [col[0].as_py() for col in expect_data]
+            expected_max_values = [col[1].as_py() for col in expect_data]
+            self.assertEqual(min_value_stats, expected_min_values)
+            self.assertEqual(max_value_stats, expected_max_values)
+        else:
+            self.assertEqual(manifest_entries[0].file.value_stats_cols, [])
+            min_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+                                                                []).values
+            max_value_stats = 
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+                                                                []).values
+            self.assertEqual(min_value_stats, [])
+            self.assertEqual(max_value_stats, [])
 
     def test_write_wrong_schema(self):
         self.catalog.create_table('default.test_wrong_schema',
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py 
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index 2ff4b09e53..f5dafaae35 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -210,7 +210,8 @@ class SchemaEvolutionReadTest(unittest.TestCase):
             ('item_id', pa.int64()),
             ('dt', pa.string())
         ])
-        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        options = {'metadata.stats-mode': 'full'}
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'], 
options=options)
         self.catalog.create_table('default.test_schema_evolution1', schema, 
False)
         table1 = self.catalog.get_table('default.test_schema_evolution1')
         write_builder = table1.new_batch_write_builder()
@@ -275,7 +276,8 @@ class SchemaEvolutionReadTest(unittest.TestCase):
             ('item_id', pa.int64()),
             ('dt', pa.string())
         ])
-        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        options = {'metadata.stats-mode': 'full'}
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'], 
options=options)
         self.catalog.create_table('default.test_schema_evolution_with_filter', 
schema, False)
         table1 = 
self.catalog.get_table('default.test_schema_evolution_with_filter')
         write_builder = table1.new_batch_write_builder()
@@ -299,7 +301,7 @@ class SchemaEvolutionReadTest(unittest.TestCase):
             ('dt', pa.string()),
             ('behavior', pa.string())
         ])
-        schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'], 
options=options)
         
self.catalog.create_table('default.test_schema_evolution_with_filter2', 
schema2, False)
         table2 = 
self.catalog.get_table('default.test_schema_evolution_with_filter2')
         table2.table_schema.id = 1
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index c7dd1e890c..eaf2b9483c 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -85,6 +85,7 @@ class DataBlobWriter(DataWriter):
         # Split schema into normal and blob columns
         all_column_names = self.table.field_names
         self.normal_column_names = [col for col in all_column_names if col != 
self.blob_column_name]
+        self.normal_columns = [field for field in 
self.table.table_schema.fields if field.name != self.blob_column_name]
         self.write_cols = self.normal_column_names
 
         # State management for blob writer
@@ -196,13 +197,15 @@ class DataBlobWriter(DataWriter):
 
         return normal_data, blob_data
 
-    def _process_normal_data(self, data: pa.RecordBatch) -> pa.Table:
+    @staticmethod
+    def _process_normal_data(data: pa.RecordBatch) -> pa.Table:
         """Process normal data (similar to base DataWriter)."""
         if data is None or data.num_rows == 0:
             return pa.Table.from_batches([])
         return pa.Table.from_batches([data])
 
-    def _merge_normal_data(self, existing_data: pa.Table, new_data: pa.Table) 
-> pa.Table:
+    @staticmethod
+    def _merge_normal_data(existing_data: pa.Table, new_data: pa.Table) -> 
pa.Table:
         return pa.concat_tables([existing_data, new_data])
 
     def _should_roll_normal(self) -> bool:
@@ -243,7 +246,7 @@ class DataBlobWriter(DataWriter):
         logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
                     f"added {len(blob_metas)} blob file metadata after normal 
metadata")
 
-    def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta:
+    def _write_normal_data_to_file(self, data: pa.Table) -> 
Optional[DataFileMeta]:
         if data.num_rows == 0:
             return None
 
@@ -271,19 +274,16 @@ class DataBlobWriter(DataWriter):
     def _create_data_file_meta(self, file_name: str, file_path: str, data: 
pa.Table,
                                external_path: Optional[str] = None) -> 
DataFileMeta:
         # Column stats (only for normal columns)
+        metadata_stats_enabled = self.options.metadata_stats_enabled()
+        stats_columns = self.normal_columns if metadata_stats_enabled else []
         column_stats = {
             field.name: self._get_column_stats(data, field.name)
-            for field in self.table.table_schema.fields
-            if field.name != self.blob_column_name
+            for field in stats_columns
         }
 
-        # Get normal fields only
-        normal_fields = [field for field in self.table.table_schema.fields
-                         if field.name != self.blob_column_name]
-
-        min_value_stats = [column_stats[field.name]['min_values'] for field in 
normal_fields]
-        max_value_stats = [column_stats[field.name]['max_values'] for field in 
normal_fields]
-        value_null_counts = [column_stats[field.name]['null_counts'] for field 
in normal_fields]
+        min_value_stats = [column_stats[field.name]['min_values'] for field in 
stats_columns]
+        max_value_stats = [column_stats[field.name]['max_values'] for field in 
stats_columns]
+        value_null_counts = [column_stats[field.name]['null_counts'] for field 
in stats_columns]
 
         self.sequence_generator.start = self.sequence_generator.current
 
@@ -298,8 +298,8 @@ class DataBlobWriter(DataWriter):
                 GenericRow([], []),
                 []),
             value_stats=SimpleStats(
-                GenericRow(min_value_stats, normal_fields),
-                GenericRow(max_value_stats, normal_fields),
+                GenericRow(min_value_stats, stats_columns),
+                GenericRow(max_value_stats, stats_columns),
                 value_null_counts),
             min_sequence_number=-1,
             max_sequence_number=-1,
@@ -309,7 +309,7 @@ class DataBlobWriter(DataWriter):
             creation_time=Timestamp.now(),
             delete_row_count=0,
             file_source=0,
-            value_stats_cols=self.normal_column_names,
+            value_stats_cols=[column.name for column in stats_columns],
             external_path=external_path,
             file_path=file_path,
             write_cols=self.write_cols)
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index d1559bc051..73609ed912 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -191,16 +191,17 @@ class DataWriter(ABC):
         max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
 
         # key stats & value stats
-        data_fields = self.table.fields if self.table.is_primary_key_table \
-            else PyarrowFieldParser.to_paimon_schema(data.schema)
+        value_stats_enabled = self.options.metadata_stats_enabled()
+        stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if 
value_stats_enabled\
+            else self.table.trimmed_primary_keys_fields
         column_stats = {
             field.name: self._get_column_stats(data, field.name)
-            for field in data_fields
+            for field in stats_fields
         }
-        all_fields = data_fields
-        min_value_stats = [column_stats[field.name]['min_values'] for field in 
all_fields]
-        max_value_stats = [column_stats[field.name]['max_values'] for field in 
all_fields]
-        value_null_counts = [column_stats[field.name]['null_counts'] for field 
in all_fields]
+        data_fields = stats_fields if value_stats_enabled else []
+        min_value_stats = [column_stats[field.name]['min_values'] for field in 
data_fields]
+        max_value_stats = [column_stats[field.name]['max_values'] for field in 
data_fields]
+        value_null_counts = [column_stats[field.name]['null_counts'] for field 
in data_fields]
         key_fields = self.trimmed_primary_keys_fields
         min_key_stats = [column_stats[field.name]['min_values'] for field in 
key_fields]
         max_key_stats = [column_stats[field.name]['max_values'] for field in 
key_fields]
@@ -235,7 +236,7 @@ class DataWriter(ABC):
             creation_time=Timestamp.now(),
             delete_row_count=0,
             file_source=0,
-            value_stats_cols=None,  # None means all columns in the data have 
statistics
+            value_stats_cols=None if value_stats_enabled else [],
             external_path=external_path_str,  # Set external path if using 
external paths
             first_row_id=None,
             write_cols=self.write_cols,

Reply via email to