This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new ed39c17de2 [python] Fix DLF partition statistical error (#6237) ed39c17de2 is described below commit ed39c17de25d655586a02b5598262ce7d04ef052 Author: umi <55790489+discivig...@users.noreply.github.com> AuthorDate: Thu Sep 11 13:20:23 2025 +0800 [python] Fix DLF partition statistical error (#6237) --- .../pypaimon/tests/py36/ao_read_write_test.py | 122 +++++++++++++++++++++ paimon-python/pypaimon/tests/reader_basic_test.py | 122 +++++++++++++++++++++ paimon-python/pypaimon/write/file_store_commit.py | 6 +- 3 files changed, 247 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 8f3201d056..9873d6bf3f 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -16,6 +16,8 @@ See the License for the specific language governing permissions and limitations under the License. """ import logging +from datetime import datetime +from unittest.mock import Mock import pandas as pd import pyarrow as pa @@ -34,6 +36,7 @@ from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, Gen from pypaimon.table.row.row_kind import RowKind from pypaimon.tests.py36.pyarrow_compat import table_sort_by from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest +from pypaimon.write.file_store_commit import FileStoreCommit class RESTTableReadWritePy36Test(RESTCatalogBaseTest): @@ -119,6 +122,125 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): pd.testing.assert_frame_equal( actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + def test_mixed_add_and_delete_entries_same_partition(self): + """Test record_count calculation with mixed ADD/DELETE entries in same partition.""" + pa_schema = pa.schema([ + ('region', pa.string()), + ('city', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + self.rest_catalog.create_table('default.tb', schema, False) + table = self.rest_catalog.get_table('default.tb') + partition_fields = [ + DataField(0, "region", AtomicType("STRING")), + DataField(1, "city", AtomicType("STRING")) + ] + partition = GenericRow(['East', 'Boston'], partition_fields) + + # Create ADD entry + add_file_meta = Mock(spec=DataFileMeta) + add_file_meta.row_count = 200 + add_file_meta.file_size = 2048 + add_file_meta.creation_time = datetime.now() + + add_entry = ManifestEntry( + kind=0, # ADD + partition=partition, + bucket=0, + total_buckets=1, + file=add_file_meta + ) + + # Create DELETE entry + delete_file_meta = Mock(spec=DataFileMeta) + delete_file_meta.row_count = 80 + delete_file_meta.file_size = 800 + delete_file_meta.creation_time = datetime.now() + + delete_entry = ManifestEntry( + kind=1, # DELETE + partition=partition, + bucket=0, + total_buckets=1, + file=delete_file_meta + ) + file_store_commit = FileStoreCommit(None, table, "") + # Test the method with both entries + statistics = file_store_commit._generate_partition_statistics([add_entry, delete_entry]) + + # Verify results - should be merged into single partition statistics + self.assertEqual(len(statistics), 1) + stat = statistics[0] + + # Net record count: +200 + (-80) = 120 + self.assertEqual(stat.record_count, 120) + self.assertEqual(stat.file_count, 0) + self.assertEqual(stat.file_size_in_bytes, 1248) + + def test_multiple_partitions_with_different_operations(self): + """Test record_count calculation across multiple partitions.""" + pa_schema = pa.schema([ + ('region', pa.string()), + ('city', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + self.rest_catalog.create_table('default.tb1', schema, False) + table = self.rest_catalog.get_table('default.tb1') + partition_fields = [ + DataField(0, "region", AtomicType("STRING")), + DataField(1, "city", AtomicType("STRING")) + ] + partition1 = GenericRow(['East', 'Boston'], partition_fields) + file_meta1 = Mock(spec=DataFileMeta) + file_meta1.row_count = 150 + file_meta1.file_size = 1500 + file_meta1.creation_time = datetime.now() + + entry1 = ManifestEntry( + kind=0, # ADD + partition=partition1, + bucket=0, + total_buckets=1, + file=file_meta1 + ) + + # Partition 2: South/LA - DELETE operation + partition2 = GenericRow(['South', 'LA'], partition_fields) + file_meta2 = Mock(spec=DataFileMeta) + file_meta2.row_count = 75 + file_meta2.file_size = 750 + file_meta2.creation_time = datetime.now() + + entry2 = ManifestEntry( + kind=1, # DELETE + partition=partition2, + bucket=0, + total_buckets=1, + file=file_meta2 + ) + + file_store_commit = FileStoreCommit(None, table, "") + # Test the method with both entries + statistics = file_store_commit._generate_partition_statistics([entry1, entry2]) + + # Verify results - should have 2 separate partition statistics + self.assertEqual(len(statistics), 2) + + # Sort by partition spec for consistent testing + statistics.sort(key=lambda s: (s.spec.get('region', ''), s.spec.get('city', ''))) + + # Check North/NY partition (ADD) + north_stat = statistics[0] + self.assertEqual(north_stat.record_count, 150) # Positive for ADD + self.assertEqual(north_stat.file_count, 1) + self.assertEqual(north_stat.file_size_in_bytes, 1500) + + # Check South/LA partition (DELETE) + south_stat = statistics[1] + self.assertEqual(south_stat.record_count, -75) # Negative for DELETE + self.assertEqual(south_stat.file_count, -1) + self.assertEqual(south_stat.file_size_in_bytes, -750) + def testParquetAppendOnlyReader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index 8354ed206a..62255c8a79 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -20,6 +20,8 @@ import os import shutil import tempfile import unittest +from datetime import datetime +from unittest.mock import Mock import pandas as pd import pyarrow as pa @@ -34,6 +36,7 @@ from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.write.file_store_commit import FileStoreCommit class ReaderBasicTest(unittest.TestCase): @@ -156,6 +159,125 @@ class ReaderBasicTest(unittest.TestCase): pd.testing.assert_frame_equal( actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + def test_mixed_add_and_delete_entries_same_partition(self): + """Test record_count calculation with mixed ADD/DELETE entries in same partition.""" + pa_schema = pa.schema([ + ('region', pa.string()), + ('city', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + self.catalog.create_table('default.tb', schema, False) + table = self.catalog.get_table('default.tb') + partition_fields = [ + DataField(0, "region", AtomicType("STRING")), + DataField(1, "city", AtomicType("STRING")) + ] + partition = GenericRow(['East', 'Boston'], partition_fields) + + # Create ADD entry + add_file_meta = Mock(spec=DataFileMeta) + add_file_meta.row_count = 200 + add_file_meta.file_size = 2048 + add_file_meta.creation_time = datetime.now() + + add_entry = ManifestEntry( + kind=0, # ADD + partition=partition, + bucket=0, + total_buckets=1, + file=add_file_meta + ) + + # Create DELETE entry + delete_file_meta = Mock(spec=DataFileMeta) + delete_file_meta.row_count = 80 + delete_file_meta.file_size = 800 + delete_file_meta.creation_time = datetime.now() + + delete_entry = ManifestEntry( + kind=1, # DELETE + partition=partition, + bucket=0, + total_buckets=1, + file=delete_file_meta + ) + file_store_commit = FileStoreCommit(None, table, "") + # Test the method with both entries + statistics = file_store_commit._generate_partition_statistics([add_entry, delete_entry]) + + # Verify results - should be merged into single partition statistics + self.assertEqual(len(statistics), 1) + stat = statistics[0] + + # Net record count: +200 + (-80) = 120 + self.assertEqual(stat.record_count, 120) + self.assertEqual(stat.file_count, 0) + self.assertEqual(stat.file_size_in_bytes, 1248) + + def test_multiple_partitions_with_different_operations(self): + """Test record_count calculation across multiple partitions.""" + pa_schema = pa.schema([ + ('region', pa.string()), + ('city', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + self.catalog.create_table('default.tb1', schema, False) + table = self.catalog.get_table('default.tb1') + partition_fields = [ + DataField(0, "region", AtomicType("STRING")), + DataField(1, "city", AtomicType("STRING")) + ] + partition1 = GenericRow(['East', 'Boston'], partition_fields) + file_meta1 = Mock(spec=DataFileMeta) + file_meta1.row_count = 150 + file_meta1.file_size = 1500 + file_meta1.creation_time = datetime.now() + + entry1 = ManifestEntry( + kind=0, # ADD + partition=partition1, + bucket=0, + total_buckets=1, + file=file_meta1 + ) + + # Partition 2: South/LA - DELETE operation + partition2 = GenericRow(['South', 'LA'], partition_fields) + file_meta2 = Mock(spec=DataFileMeta) + file_meta2.row_count = 75 + file_meta2.file_size = 750 + file_meta2.creation_time = datetime.now() + + entry2 = ManifestEntry( + kind=1, # DELETE + partition=partition2, + bucket=0, + total_buckets=1, + file=file_meta2 + ) + + file_store_commit = FileStoreCommit(None, table, "") + # Test the method with both entries + statistics = file_store_commit._generate_partition_statistics([entry1, entry2]) + + # Verify results - should have 2 separate partition statistics + self.assertEqual(len(statistics), 2) + + # Sort by partition spec for consistent testing + statistics.sort(key=lambda s: (s.spec.get('region', ''), s.spec.get('city', ''))) + + # Check North/NY partition (ADD) + north_stat = statistics[0] + self.assertEqual(north_stat.record_count, 150) # Positive for ADD + self.assertEqual(north_stat.file_count, 1) + self.assertEqual(north_stat.file_size_in_bytes, 1500) + + # Check South/LA partition (DELETE) + south_stat = statistics[1] + self.assertEqual(south_stat.record_count, -75) # Negative for DELETE + self.assertEqual(south_stat.file_count, -1) + self.assertEqual(south_stat.file_size_in_bytes, -750) + def testWriteWrongSchema(self): self.catalog.create_table('default.test_wrong_schema', Schema.from_pyarrow_schema(self.pa_schema), diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 8136117092..03c4d034d0 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -281,9 +281,9 @@ class FileStoreCommit: # Following Java implementation: PartitionEntry.fromDataFile() file_meta = entry.file # Extract actual file metadata (following Java DataFileMeta pattern) - record_count = file_meta.row_count - file_size_in_bytes = file_meta.file_size - file_count = 1 + record_count = file_meta.row_count if entry.kind == 0 else file_meta.row_count * -1 + file_size_in_bytes = file_meta.file_size if entry.kind == 0 else file_meta.file_size * -1 + file_count = 1 if entry.kind == 0 else -1 # Convert creation_time to milliseconds (Java uses epoch millis) if file_meta.creation_time: