This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 662a56662a [python] fix value_stats containing system fields for
primary key tables (#6945)
662a56662a is described below
commit 662a56662aab574b6a10ac696a8a6c4d3c7a90c1
Author: XiaoHongbo <[email protected]>
AuthorDate: Sat Jan 3 20:53:01 2026 +0800
[python] fix value_stats containing system fields for primary key tables
(#6945)
---
.../paimon/table/PrimaryKeySimpleTableTest.java | 29 +++++
paimon-python/pypaimon/tests/reader_base_test.py | 132 +++++++++++++++++++++
paimon-python/pypaimon/write/writer/data_writer.py | 7 +-
3 files changed, 166 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 3b9d7d2831..ef0ab9de24 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -101,6 +101,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
@@ -142,6 +143,7 @@ import static
org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
+import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -248,6 +250,33 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
file = ((DataSplit)
table.newScan().plan().splits().get(0)).dataFiles().get(0);
assertThat(file.level()).isEqualTo(5);
assertThat(file.valueStats().maxValues().getFieldCount()).isGreaterThan(4);
+
+ if (file.valueStatsCols() == null) {
+ int expectedFieldCount = table.schema().fields().size();
+ int actualFieldCount =
file.valueStats().minValues().getFieldCount();
+ assertThat(actualFieldCount)
+ .as(
+ "When value_stats_cols is null, value_stats field
count should match table.fields count. "
+ + "This ensures value_stats does NOT
contain system fields.")
+ .isEqualTo(expectedFieldCount);
+ } else {
+ for (String fieldName :
Objects.requireNonNull(file.valueStatsCols())) {
+ boolean isSystemField =
+ fieldName.startsWith(KEY_FIELD_PREFIX)
+ || SpecialFields.isSystemField(fieldName);
+ assertThat(isSystemField)
+ .as("value_stats_cols should NOT contain system field:
" + fieldName)
+ .isFalse();
+ }
+ assertThat(file.valueStats().minValues().getFieldCount())
+ .as("value_stats field count should match value_stats_cols
size")
+
.isEqualTo(Objects.requireNonNull(file.valueStatsCols()).size());
+ }
+
+ assertThat(file.valueStats().minValues().getFieldCount())
+ .isEqualTo(file.valueStats().maxValues().getFieldCount());
+ assertThat(file.valueStats().nullCounts().size())
+ .isEqualTo(file.valueStats().minValues().getFieldCount());
}
@Test
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index 92a275585c..81a7a5baf9 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -471,6 +471,81 @@ class ReaderBasicTest(unittest.TestCase):
test_name="specific_case"
)
+ schema_with_stats = Schema.from_pyarrow_schema(pa_schema,
options={'metadata.stats-mode': 'full'})
+ catalog.create_table("test_db.test_value_stats_cols_schema_match",
schema_with_stats, False)
+ table_with_stats =
catalog.get_table("test_db.test_value_stats_cols_schema_match")
+ self._test_append_only_schema_match_case(table_with_stats, pa_schema)
+
+ def test_primary_key_value_stats_excludes_system_fields(self):
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_db_system_fields", True)
+
+ pk_pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('name', pa.string()),
+ ('price', pa.float64()),
+ ])
+ pk_schema = Schema.from_pyarrow_schema(
+ pk_pa_schema,
+ primary_keys=['id'],
+ options={'metadata.stats-mode': 'full', 'bucket': '2'}
+ )
+
catalog.create_table("test_db_system_fields.test_pk_value_stats_system_fields",
pk_schema, False)
+ pk_table =
catalog.get_table("test_db_system_fields.test_pk_value_stats_system_fields")
+
+ pk_test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie'],
+ 'price': [10.5, 20.3, 30.7],
+ }, schema=pk_pa_schema)
+
+ pk_write_builder = pk_table.new_batch_write_builder()
+ pk_writer = pk_write_builder.new_write()
+ pk_writer.write_arrow(pk_test_data)
+ pk_commit_messages = pk_writer.prepare_commit()
+ pk_commit = pk_write_builder.new_commit()
+ pk_commit.commit(pk_commit_messages)
+ pk_writer.close()
+
+ pk_read_builder = pk_table.new_read_builder()
+ pk_table_scan = pk_read_builder.new_scan()
+ latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
+ pk_manifest_files =
pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ pk_manifest_entries =
pk_table_scan.starting_scanner.manifest_file_manager.read(
+ pk_manifest_files[0].file_name,
+ lambda row:
pk_table_scan.starting_scanner._filter_manifest_entry(row),
+ False
+ )
+
+ self.assertGreater(len(pk_manifest_entries), 0, "Should have at least
one manifest entry")
+ pk_file_meta = pk_manifest_entries[0].file
+
+ pk_table_field_names = {f.name for f in pk_table.fields}
+ system_fields = {'_KEY_id', '_SEQUENCE_NUMBER', '_VALUE_KIND',
'_ROW_ID'}
+ pk_table_has_system_fields = bool(pk_table_field_names & system_fields)
+ self.assertFalse(pk_table_has_system_fields,
+ f"table.fields should NOT contain system fields, but
got: {pk_table_field_names}")
+
+ if pk_file_meta.value_stats_cols is None:
+ pk_value_stats_fields =
pk_table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+ {'_VALUE_STATS_COLS': None},
+ pk_table.fields
+ )
+ expected_count = len(pk_value_stats_fields)
+ actual_count = pk_file_meta.value_stats.min_values.arity
+ self.assertEqual(actual_count, expected_count,
+ f"Field count mismatch: value_stats has
{actual_count} fields, "
+ f"but table.fields has {expected_count} fields. "
+ f"This indicates value_stats contains system
fields that are not in table.fields.")
+ else:
+ for field_name in pk_file_meta.value_stats_cols:
+ is_system_field = (field_name.startswith('_KEY_') or
+ field_name in ['_SEQUENCE_NUMBER',
'_VALUE_KIND', '_ROW_ID'])
+ self.assertFalse(is_system_field,
+ f"value_stats_cols should not contain system
field: {field_name}")
+
def test_types(self):
data_fields = [
DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
@@ -700,6 +775,63 @@ class ReaderBasicTest(unittest.TestCase):
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
+ def _test_append_only_schema_match_case(self, table, pa_schema):
+ """Test that for append-only tables, data.schema matches table.fields.
+
+ This verifies the assumption in data_writer.py that for append-only
tables,
+ PyarrowFieldParser.to_paimon_schema(data.schema) should have the same
fields
+ as self.table.fields (same count and same field names).
+ """
+ self.assertFalse(table.is_primary_key_table,
+ "Table should be append-only (no primary keys)")
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie'],
+ 'price': [10.5, 20.3, 30.7],
+ 'category': ['A', 'B', 'C'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Verify that data.schema (converted to paimon schema) matches
table.fields
+ data_fields_from_schema =
PyarrowFieldParser.to_paimon_schema(test_data.schema)
+ table_fields = table.fields
+
+ # Verify field count matches
+ self.assertEqual(len(data_fields_from_schema), len(table_fields),
+ f"Field count mismatch: data.schema has
{len(data_fields_from_schema)} fields, "
+ f"but table.fields has {len(table_fields)} fields")
+
+ # Verify field names match (order may differ, but names should match)
+ data_field_names = {field.name for field in data_fields_from_schema}
+ table_field_names = {field.name for field in table_fields}
+ self.assertEqual(data_field_names, table_field_names,
+ f"Field names mismatch: data.schema has
{data_field_names}, "
+ f"but table.fields has {table_field_names}")
+
+ # Read manifest to verify value_stats_cols is None (all fields
included)
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+ manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
+ manifest_files[0].file_name,
+ lambda row:
table_scan.starting_scanner._filter_manifest_entry(row),
+ False
+ )
+
+ if len(manifest_entries) > 0:
+ file_meta = manifest_entries[0].file
+ self.assertIsNone(file_meta.value_stats_cols,
+ "value_stats_cols should be None when all table
fields are included")
+
def test_split_target_size(self):
"""Test source.split.target-size configuration effect on split
generation."""
from pypaimon.common.options.core_options import CoreOptions
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 73609ed912..38dd58c15e 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -192,8 +192,11 @@ class DataWriter(ABC):
# key stats & value stats
value_stats_enabled = self.options.metadata_stats_enabled()
- stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if
value_stats_enabled\
- else self.table.trimmed_primary_keys_fields
+ if value_stats_enabled:
+ stats_fields = self.table.fields if
self.table.is_primary_key_table \
+ else PyarrowFieldParser.to_paimon_schema(data.schema)
+ else:
+ stats_fields = self.table.trimmed_primary_keys_fields
column_stats = {
field.name: self._get_column_stats(data, field.name)
for field in stats_fields