This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed39c17de2 [python] Fix DLF partition statistical error (#6237)
ed39c17de2 is described below

commit ed39c17de25d655586a02b5598262ce7d04ef052
Author: umi <55790489+discivig...@users.noreply.github.com>
AuthorDate: Thu Sep 11 13:20:23 2025 +0800

    [python] Fix DLF partition statistical error (#6237)
---
 .../pypaimon/tests/py36/ao_read_write_test.py      | 122 +++++++++++++++++++++
 paimon-python/pypaimon/tests/reader_basic_test.py  | 122 +++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_commit.py  |   6 +-
 3 files changed, 247 insertions(+), 3 deletions(-)

diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index 8f3201d056..9873d6bf3f 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -16,6 +16,8 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 import logging
+from datetime import datetime
+from unittest.mock import Mock
 
 import pandas as pd
 import pyarrow as pa
@@ -34,6 +36,7 @@ from pypaimon.table.row.generic_row import GenericRow, 
GenericRowSerializer, Gen
 from pypaimon.table.row.row_kind import RowKind
 from pypaimon.tests.py36.pyarrow_compat import table_sort_by
 from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
+from pypaimon.write.file_store_commit import FileStoreCommit
 
 
 class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
@@ -119,6 +122,125 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
         pd.testing.assert_frame_equal(
             actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
 
+    def test_mixed_add_and_delete_entries_same_partition(self):
+        """Test record_count calculation with mixed ADD/DELETE entries in same 
partition."""
+        pa_schema = pa.schema([
+            ('region', pa.string()),
+            ('city', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table('default.tb', schema, False)
+        table = self.rest_catalog.get_table('default.tb')
+        partition_fields = [
+            DataField(0, "region", AtomicType("STRING")),
+            DataField(1, "city", AtomicType("STRING"))
+        ]
+        partition = GenericRow(['East', 'Boston'], partition_fields)
+
+        # Create ADD entry
+        add_file_meta = Mock(spec=DataFileMeta)
+        add_file_meta.row_count = 200
+        add_file_meta.file_size = 2048
+        add_file_meta.creation_time = datetime.now()
+
+        add_entry = ManifestEntry(
+            kind=0,  # ADD
+            partition=partition,
+            bucket=0,
+            total_buckets=1,
+            file=add_file_meta
+        )
+
+        # Create DELETE entry
+        delete_file_meta = Mock(spec=DataFileMeta)
+        delete_file_meta.row_count = 80
+        delete_file_meta.file_size = 800
+        delete_file_meta.creation_time = datetime.now()
+
+        delete_entry = ManifestEntry(
+            kind=1,  # DELETE
+            partition=partition,
+            bucket=0,
+            total_buckets=1,
+            file=delete_file_meta
+        )
+        file_store_commit = FileStoreCommit(None, table, "")
+        # Test the method with both entries
+        statistics = 
file_store_commit._generate_partition_statistics([add_entry, delete_entry])
+
+        # Verify results - should be merged into single partition statistics
+        self.assertEqual(len(statistics), 1)
+        stat = statistics[0]
+
+        # Net record count: +200 + (-80) = 120
+        self.assertEqual(stat.record_count, 120)
+        self.assertEqual(stat.file_count, 0)
+        self.assertEqual(stat.file_size_in_bytes, 1248)
+
+    def test_multiple_partitions_with_different_operations(self):
+        """Test record_count calculation across multiple partitions."""
+        pa_schema = pa.schema([
+            ('region', pa.string()),
+            ('city', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table('default.tb1', schema, False)
+        table = self.rest_catalog.get_table('default.tb1')
+        partition_fields = [
+            DataField(0, "region", AtomicType("STRING")),
+            DataField(1, "city", AtomicType("STRING"))
+        ]
+        partition1 = GenericRow(['East', 'Boston'], partition_fields)
+        file_meta1 = Mock(spec=DataFileMeta)
+        file_meta1.row_count = 150
+        file_meta1.file_size = 1500
+        file_meta1.creation_time = datetime.now()
+
+        entry1 = ManifestEntry(
+            kind=0,  # ADD
+            partition=partition1,
+            bucket=0,
+            total_buckets=1,
+            file=file_meta1
+        )
+
+        # Partition 2: South/LA - DELETE operation
+        partition2 = GenericRow(['South', 'LA'], partition_fields)
+        file_meta2 = Mock(spec=DataFileMeta)
+        file_meta2.row_count = 75
+        file_meta2.file_size = 750
+        file_meta2.creation_time = datetime.now()
+
+        entry2 = ManifestEntry(
+            kind=1,  # DELETE
+            partition=partition2,
+            bucket=0,
+            total_buckets=1,
+            file=file_meta2
+        )
+
+        file_store_commit = FileStoreCommit(None, table, "")
+        # Test the method with both entries
+        statistics = file_store_commit._generate_partition_statistics([entry1, 
entry2])
+
+        # Verify results - should have 2 separate partition statistics
+        self.assertEqual(len(statistics), 2)
+
+        # Sort by partition spec for consistent testing
+        statistics.sort(key=lambda s: (s.spec.get('region', ''), 
s.spec.get('city', '')))
+
+        # Check North/NY partition (ADD)
+        north_stat = statistics[0]
+        self.assertEqual(north_stat.record_count, 150)  # Positive for ADD
+        self.assertEqual(north_stat.file_count, 1)
+        self.assertEqual(north_stat.file_size_in_bytes, 1500)
+
+        # Check South/LA partition (DELETE)
+        south_stat = statistics[1]
+        self.assertEqual(south_stat.record_count, -75)  # Negative for DELETE
+        self.assertEqual(south_stat.file_count, -1)
+        self.assertEqual(south_stat.file_size_in_bytes, -750)
+
     def testParquetAppendOnlyReader(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         self.rest_catalog.create_table('default.test_append_only_parquet', 
schema, False)
diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py 
b/paimon-python/pypaimon/tests/reader_basic_test.py
index 8354ed206a..62255c8a79 100644
--- a/paimon-python/pypaimon/tests/reader_basic_test.py
+++ b/paimon-python/pypaimon/tests/reader_basic_test.py
@@ -20,6 +20,8 @@ import os
 import shutil
 import tempfile
 import unittest
+from datetime import datetime
+from unittest.mock import Mock
 
 import pandas as pd
 import pyarrow as pa
@@ -34,6 +36,7 @@ from pypaimon.manifest.manifest_file_manager import 
ManifestFileManager
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.write.file_store_commit import FileStoreCommit
 
 
 class ReaderBasicTest(unittest.TestCase):
@@ -156,6 +159,125 @@ class ReaderBasicTest(unittest.TestCase):
         pd.testing.assert_frame_equal(
             actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
 
+    def test_mixed_add_and_delete_entries_same_partition(self):
+        """Test record_count calculation with mixed ADD/DELETE entries in same 
partition."""
+        pa_schema = pa.schema([
+            ('region', pa.string()),
+            ('city', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.catalog.create_table('default.tb', schema, False)
+        table = self.catalog.get_table('default.tb')
+        partition_fields = [
+            DataField(0, "region", AtomicType("STRING")),
+            DataField(1, "city", AtomicType("STRING"))
+        ]
+        partition = GenericRow(['East', 'Boston'], partition_fields)
+
+        # Create ADD entry
+        add_file_meta = Mock(spec=DataFileMeta)
+        add_file_meta.row_count = 200
+        add_file_meta.file_size = 2048
+        add_file_meta.creation_time = datetime.now()
+
+        add_entry = ManifestEntry(
+            kind=0,  # ADD
+            partition=partition,
+            bucket=0,
+            total_buckets=1,
+            file=add_file_meta
+        )
+
+        # Create DELETE entry
+        delete_file_meta = Mock(spec=DataFileMeta)
+        delete_file_meta.row_count = 80
+        delete_file_meta.file_size = 800
+        delete_file_meta.creation_time = datetime.now()
+
+        delete_entry = ManifestEntry(
+            kind=1,  # DELETE
+            partition=partition,
+            bucket=0,
+            total_buckets=1,
+            file=delete_file_meta
+        )
+        file_store_commit = FileStoreCommit(None, table, "")
+        # Test the method with both entries
+        statistics = 
file_store_commit._generate_partition_statistics([add_entry, delete_entry])
+
+        # Verify results - should be merged into single partition statistics
+        self.assertEqual(len(statistics), 1)
+        stat = statistics[0]
+
+        # Net record count: +200 + (-80) = 120
+        self.assertEqual(stat.record_count, 120)
+        self.assertEqual(stat.file_count, 0)
+        self.assertEqual(stat.file_size_in_bytes, 1248)
+
+    def test_multiple_partitions_with_different_operations(self):
+        """Test record_count calculation across multiple partitions."""
+        pa_schema = pa.schema([
+            ('region', pa.string()),
+            ('city', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.catalog.create_table('default.tb1', schema, False)
+        table = self.catalog.get_table('default.tb1')
+        partition_fields = [
+            DataField(0, "region", AtomicType("STRING")),
+            DataField(1, "city", AtomicType("STRING"))
+        ]
+        partition1 = GenericRow(['East', 'Boston'], partition_fields)
+        file_meta1 = Mock(spec=DataFileMeta)
+        file_meta1.row_count = 150
+        file_meta1.file_size = 1500
+        file_meta1.creation_time = datetime.now()
+
+        entry1 = ManifestEntry(
+            kind=0,  # ADD
+            partition=partition1,
+            bucket=0,
+            total_buckets=1,
+            file=file_meta1
+        )
+
+        # Partition 2: South/LA - DELETE operation
+        partition2 = GenericRow(['South', 'LA'], partition_fields)
+        file_meta2 = Mock(spec=DataFileMeta)
+        file_meta2.row_count = 75
+        file_meta2.file_size = 750
+        file_meta2.creation_time = datetime.now()
+
+        entry2 = ManifestEntry(
+            kind=1,  # DELETE
+            partition=partition2,
+            bucket=0,
+            total_buckets=1,
+            file=file_meta2
+        )
+
+        file_store_commit = FileStoreCommit(None, table, "")
+        # Test the method with both entries
+        statistics = file_store_commit._generate_partition_statistics([entry1, 
entry2])
+
+        # Verify results - should have 2 separate partition statistics
+        self.assertEqual(len(statistics), 2)
+
+        # Sort by partition spec for consistent testing
+        statistics.sort(key=lambda s: (s.spec.get('region', ''), 
s.spec.get('city', '')))
+
+        # Check North/NY partition (ADD)
+        north_stat = statistics[0]
+        self.assertEqual(north_stat.record_count, 150)  # Positive for ADD
+        self.assertEqual(north_stat.file_count, 1)
+        self.assertEqual(north_stat.file_size_in_bytes, 1500)
+
+        # Check South/LA partition (DELETE)
+        south_stat = statistics[1]
+        self.assertEqual(south_stat.record_count, -75)  # Negative for DELETE
+        self.assertEqual(south_stat.file_count, -1)
+        self.assertEqual(south_stat.file_size_in_bytes, -750)
+
     def testWriteWrongSchema(self):
         self.catalog.create_table('default.test_wrong_schema',
                                   Schema.from_pyarrow_schema(self.pa_schema),
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 8136117092..03c4d034d0 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -281,9 +281,9 @@ class FileStoreCommit:
             # Following Java implementation: PartitionEntry.fromDataFile()
             file_meta = entry.file
             # Extract actual file metadata (following Java DataFileMeta 
pattern)
-            record_count = file_meta.row_count
-            file_size_in_bytes = file_meta.file_size
-            file_count = 1
+            record_count = file_meta.row_count if entry.kind == 0 else 
file_meta.row_count * -1
+            file_size_in_bytes = file_meta.file_size if entry.kind == 0 else 
file_meta.file_size * -1
+            file_count = 1 if entry.kind == 0 else -1
 
             # Convert creation_time to milliseconds (Java uses epoch millis)
             if file_meta.creation_time:

Reply via email to