This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 827e1815af [python] Support FileStoreCommit overwrite dynamic 
partition (#7749)
827e1815af is described below

commit 827e1815af04d0143a8327f7e8b4eaf0648b24f0
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sat May 9 16:59:14 2026 +0800

    [python] Support FileStoreCommit overwrite dynamic partition (#7749)
    
    Support dynamic partition overwrite in `FileStoreCommit.overwrite()`,
    aligned with Java `FileStoreCommitImpl.overwritePartition()`:
    
    - When `dynamic-partition-overwrite=true` (default), only partitions
    present in the committed data are replaced — other partitions remain
    untouched.
    - When no data is committed, the overwrite is skipped entirely. When
    `dynamic-partition-overwrite=false`, existing static overwrite behavior
    is unchanged.
    
    Key Changes:
    - Add `DYNAMIC_PARTITION_OVERWRITE` config option and accessor in
    `CoreOptions`.
    - Restructure `overwrite()` with `skip_overwrite` / `partition_filter`
    control flow mirroring Java's `skipOverwrite` / `partitionFilter`.
    - Add `_create_dynamic_partition_filter()` mirroring
    `PartitionPredicate.fromMultiple`.
    - Add `_create_static_partition_filter()` mirroring
    `createPartitionPredicate` + `fromPredicate`.
---
 .../pypaimon/common/options/core_options.py        |  13 +++
 .../pypaimon/tests/partition_predicate_test.py     | 100 +++++++++++++++++++-
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |  65 +++++++++++++
 paimon-python/pypaimon/tests/reader_base_test.py   |  65 +++++++++++++
 .../pypaimon/tests/rest/rest_read_write_test.py    |  65 +++++++++++++
 paimon-python/pypaimon/tests/table_commit_test.py  | 102 +++++++++++++++++++++
 .../pypaimon/write/commit/commit_scanner.py        |   5 +-
 paimon-python/pypaimon/write/file_store_commit.py  |  94 +++++++++++++------
 paimon-python/pypaimon/write/table_commit.py       |  19 ++--
 9 files changed, 492 insertions(+), 36 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 6a4b51c8c3..3fe2e79455 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -442,6 +442,16 @@ class CoreOptions:
         )
     )
 
+    DYNAMIC_PARTITION_OVERWRITE: ConfigOption[bool] = (
+        ConfigOptions.key("dynamic-partition-overwrite")
+        .boolean_type()
+        .default_value(True)
+        .with_description(
+            "Whether only overwrite dynamic partition when overwriting a 
partitioned table "
+            "with dynamic partition columns. Works only when the table has 
partition keys."
+        )
+    )
+
     def __init__(self, options: Options):
         self.options = options
 
@@ -622,3 +632,6 @@ class CoreOptions:
 
     def add_column_before_partition(self) -> bool:
         return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)
+
+    def dynamic_partition_overwrite(self) -> bool:
+        return self.options.get(CoreOptions.DYNAMIC_PARTITION_OVERWRITE)
diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py 
b/paimon-python/pypaimon/tests/partition_predicate_test.py
index 1dbc9a150f..f4fe38f767 100644
--- a/paimon-python/pypaimon/tests/partition_predicate_test.py
+++ b/paimon-python/pypaimon/tests/partition_predicate_test.py
@@ -52,6 +52,7 @@ def _mock_table():
     table.fields = TABLE_FIELDS
     table.partition_keys = ['dt', 'region']
     table.partition_keys_fields = PARTITION_FIELDS
+    table.options.options.get = Mock(return_value="__DEFAULT_PARTITION__")
     return table
 
 
@@ -181,9 +182,9 @@ class TestOverwritePartitionPredicate(unittest.TestCase):
             return mock_cls.call_args[1]['partition_predicate']
 
     def test_overwrite_rejects_mismatched_partition(self, *_):
-        commit = self._create_commit(stub_commit=False)
+        commit = self._create_commit()
         with self.assertRaises(RuntimeError) as ctx:
-            commit.overwrite(self._TARGET, [self._msg(('2024-01-15', 
'us-west-2'))], 1)
+            commit._create_static_partition_filter(self._TARGET, 
[self._msg(('2024-01-15', 'us-west-2'))])
         self.assertIn('does not belong to this partition', str(ctx.exception))
 
     def test_overwrite_passes_partition_scoped_predicate(self, *_):
@@ -206,6 +207,89 @@ class TestOverwritePartitionPredicate(unittest.TestCase):
         self.assertTrue(pred.test(OffsetRow(('2024-01-16', 'us-west-2'), 0, 
2)))
         self.assertFalse(pred.test(OffsetRow(('2024-01-17', 'eu-west-1'), 0, 
2)))
 
+    def test_overwrite_null_partition_value(self, *_):
+        """Test that overwrite with None partition value uses isNull 
predicate."""
+        commit = self._create_commit()
+        target = {'dt': None, 'region': 'us-east-1'}
+        commit.overwrite(target, [self._msg((None, 'us-east-1'))], 1)
+
+        pred = self._extract_partition_predicate(commit)
+        # Should match rows where dt is None and region is 'us-east-1'
+        self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+        # Should not match rows where dt has a value
+        self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 
2)))
+        # Should not match rows where region differs
+        self.assertFalse(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))
+
+    def test_overwrite_default_partition_name_treated_as_null(self, *_):
+        """Test that overwrite with default partition name string is treated 
as null."""
+        commit = self._create_commit()
+        target = {'dt': '__DEFAULT_PARTITION__', 'region': 'us-east-1'}
+        commit.overwrite(target, [self._msg((None, 'us-east-1'))], 1)
+
+        pred = self._extract_partition_predicate(commit)
+        # __DEFAULT_PARTITION__ should be treated like None (isNull)
+        self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+        self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 
2)))
+
+    def test_overwrite_all_null_partition_values(self, *_):
+        """Test overwrite where all partition values are None."""
+        commit = self._create_commit()
+        target = {'dt': None, 'region': None}
+        commit.overwrite(target, [self._msg((None, None))], 1)
+
+        pred = self._extract_partition_predicate(commit)
+        self.assertTrue(pred.test(OffsetRow((None, None), 0, 2)))
+        self.assertFalse(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+        self.assertFalse(pred.test(OffsetRow(('2024-01-15', None), 0, 2)))
+
+    def test_overwrite_null_partition_rejects_mismatched(self, *_):
+        """Test that overwrite with null partition rejects rows that don't 
match."""
+        commit = self._create_commit()
+        target = {'dt': None, 'region': 'us-east-1'}
+        # Trying to overwrite null dt partition with data that has a non-null 
dt
+        with self.assertRaises(RuntimeError) as ctx:
+            commit._create_static_partition_filter(target, 
[self._msg(('2024-01-15', 'us-east-1'))])
+        self.assertIn('does not belong to this partition', str(ctx.exception))
+
+    def test_dynamic_overwrite_null_partition_value(self, *_):
+        """Test dynamic partition overwrite with None partition values."""
+        commit = self._create_commit()
+        self.table.options.dynamic_partition_overwrite.return_value = True
+        commit.overwrite({}, [self._msg((None, 'us-east-1'))], 1)
+
+        pred = self._extract_partition_predicate(commit)
+        self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+        self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 
2)))
+
+    def test_dynamic_overwrite_mixed_null_and_nonnull(self, *_):
+        """Test dynamic partition overwrite with both null and non-null 
partitions."""
+        commit = self._create_commit()
+        self.table.options.dynamic_partition_overwrite.return_value = True
+        commit.overwrite({}, [
+            self._msg(('2024-01-15', 'us-east-1')),
+            self._msg((None, 'us-west-2')),
+        ], 1)
+
+        pred = self._extract_partition_predicate(commit)
+        self.assertTrue(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 
2)))
+        self.assertTrue(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))
+        self.assertFalse(pred.test(OffsetRow(('2024-01-16', 'eu-west-1'), 0, 
2)))
+
+    def test_drop_partitions_null_partition_value(self, *_):
+        """Test drop_partitions with default partition name string 
representing null."""
+        commit = self._create_commit()
+        commit.drop_partitions([
+            {'dt': '__DEFAULT_PARTITION__', 'region': 'us-east-1'},
+            {'dt': '2024-01-16', 'region': 'us-west-2'},
+        ], 1)
+
+        pred = self._extract_partition_predicate(commit)
+        self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+        self.assertTrue(pred.test(OffsetRow(('2024-01-16', 'us-west-2'), 0, 
2)))
+        self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0, 
2)))
+        self.assertFalse(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))
+
 
 class TestCommitScannerPartitionPredicate(unittest.TestCase):
 
@@ -223,6 +307,18 @@ class 
TestCommitScannerPartitionPredicate(unittest.TestCase):
         self.assertTrue(pred.test(GenericRow(['2024-01-16', 'us-west-2'], 
PARTITION_FIELDS)))
         self.assertFalse(pred.test(GenericRow(['2024-01-17', 'eu-west-1'], 
PARTITION_FIELDS)))
 
+    def test_filter_handles_null_partition_values(self):
+        scanner = self._scanner()
+        pred = scanner._build_partition_filter_from_entries([
+            _manifest_entry([None, 'us-east-1']),
+            _manifest_entry(['2024-01-16', 'us-west-2']),
+        ])
+
+        self.assertTrue(pred.test(GenericRow([None, 'us-east-1'], 
PARTITION_FIELDS)))
+        self.assertTrue(pred.test(GenericRow(['2024-01-16', 'us-west-2'], 
PARTITION_FIELDS)))
+        self.assertFalse(pred.test(GenericRow(['2024-01-15', 'us-east-1'], 
PARTITION_FIELDS)))
+        self.assertFalse(pred.test(GenericRow([None, 'us-west-2'], 
PARTITION_FIELDS)))
+
     def test_filter_none_without_partition_keys(self):
         scanner = CommitScanner(Mock(partition_keys=[]), Mock())
 
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 5258aec1d0..2a39628136 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -128,6 +128,71 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
         pd.testing.assert_frame_equal(
             actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
 
+    def test_dynamic_partition_overwrite(self):
+        pa_schema = pa.schema([
+            ('f0', pa.string()),
+            ('f1', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
+        self.rest_catalog.create_table('default.test_dynamic_overwrite', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
+        read_builder = table.new_read_builder()
+
+        # Write initial non-null and null partitions
+        self._batch_write(table, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['apple', 'banana', 'cherry'],
+        }))
+
+        # Dynamic overwrite partition f0='a' only; 'b' and null untouched
+        self._batch_overwrite(table, pd.DataFrame({
+            'f0': ['a'],
+            'f1': ['watermelon'],
+        }))
+
+        self._assert_table_equals(read_builder, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['watermelon', 'banana', 'cherry'],
+        }), sort_by='f0')
+
+        # Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
+        self._batch_overwrite(table, pd.DataFrame({
+            'f0': ['a', None],
+            'f1': ['mango', 'grape'],
+        }))
+
+        self._assert_table_equals(read_builder, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['mango', 'banana', 'grape'],
+        }), sort_by='f0')
+
+    def _batch_write(self, table, df):
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _batch_overwrite(self, table, df, partition=None):
+        write_builder = table.new_batch_write_builder().overwrite(partition)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df = table_read.to_pandas(table_scan.plan().splits())
+        if sort_by:
+            actual_df = actual_df.sort_values(by=sort_by)
+        pd.testing.assert_frame_equal(
+            actual_df.reset_index(drop=True), 
expected_df.reset_index(drop=True))
+
     def test_full_data_types(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int8()),
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index f1b147d147..e64d4ab35c 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -162,6 +162,71 @@ class ReaderBasicTest(unittest.TestCase):
         pd.testing.assert_frame_equal(
             actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
 
+    def test_dynamic_partition_overwrite(self):
+        pa_schema = pa.schema([
+            ('f0', pa.string()),
+            ('f1', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
+        self.catalog.create_table('default.test_dynamic_overwrite', schema, 
False)
+        table = self.catalog.get_table('default.test_dynamic_overwrite')
+        read_builder = table.new_read_builder()
+
+        # Write initial non-null and null partitions
+        self._batch_write(table, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['apple', 'banana', 'cherry'],
+        }))
+
+        # Dynamic overwrite partition f0='a' only; 'b' and null untouched
+        self._batch_overwrite(table, pd.DataFrame({
+            'f0': ['a'],
+            'f1': ['watermelon'],
+        }))
+
+        self._assert_table_equals(read_builder, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['watermelon', 'banana', 'cherry'],
+        }), sort_by='f0')
+
+        # Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
+        self._batch_overwrite(table, pd.DataFrame({
+            'f0': ['a', None],
+            'f1': ['mango', 'grape'],
+        }))
+
+        self._assert_table_equals(read_builder, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['mango', 'banana', 'grape'],
+        }), sort_by='f0')
+
+    def _batch_write(self, table, df):
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _batch_overwrite(self, table, df, partition=None):
+        write_builder = table.new_batch_write_builder().overwrite(partition)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df = table_read.to_pandas(table_scan.plan().splits())
+        if sort_by:
+            actual_df = actual_df.sort_values(by=sort_by)
+        pd.testing.assert_frame_equal(
+            actual_df.reset_index(drop=True), 
expected_df.reset_index(drop=True))
+
     def test_full_data_types(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int8()),
diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py 
b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
index 455cc6cdf9..ce4fabda19 100644
--- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
@@ -116,6 +116,71 @@ class RESTTableReadWriteTest(RESTBaseTest):
         pd.testing.assert_frame_equal(
             actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
 
+    def test_dynamic_partition_overwrite(self):
+        pa_schema = pa.schema([
+            ('f0', pa.string()),
+            ('f1', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
+        self.rest_catalog.create_table('default.test_dynamic_overwrite', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
+        read_builder = table.new_read_builder()
+
+        # Write initial non-null and null partitions
+        self._batch_write(table, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['apple', 'banana', 'cherry'],
+        }))
+
+        # Dynamic overwrite partition f0='a' only; 'b' and null untouched
+        self._batch_overwrite(table, pd.DataFrame({
+            'f0': ['a'],
+            'f1': ['watermelon'],
+        }))
+
+        self._assert_table_equals(read_builder, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['watermelon', 'banana', 'cherry'],
+        }), sort_by='f0')
+
+        # Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
+        self._batch_overwrite(table, pd.DataFrame({
+            'f0': ['a', None],
+            'f1': ['mango', 'grape'],
+        }))
+
+        self._assert_table_equals(read_builder, pd.DataFrame({
+            'f0': ['a', 'b', None],
+            'f1': ['mango', 'banana', 'grape'],
+        }), sort_by='f0')
+
+    def _batch_write(self, table, df):
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _batch_overwrite(self, table, df, partition=None):
+        write_builder = table.new_batch_write_builder().overwrite(partition)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_pandas(df)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df = table_read.to_pandas(table_scan.plan().splits())
+        if sort_by:
+            actual_df = actual_df.sort_values(by=sort_by)
+        pd.testing.assert_frame_equal(
+            actual_df.reset_index(drop=True), 
expected_df.reset_index(drop=True))
+
     def test_parquet_ao_reader(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         self.rest_catalog.create_table('default.test_append_only_parquet', 
schema, False)
diff --git a/paimon-python/pypaimon/tests/table_commit_test.py 
b/paimon-python/pypaimon/tests/table_commit_test.py
new file mode 100644
index 0000000000..87fb6a50dd
--- /dev/null
+++ b/paimon-python/pypaimon/tests/table_commit_test.py
@@ -0,0 +1,102 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import unittest
+from unittest.mock import Mock
+
+from parameterized import parameterized
+
+from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit
+
+
+class TestTableCommitEmptyOverwrite(unittest.TestCase):
+    """Tests for TableCommit._commit handling of empty commit messages in 
overwrite mode."""
+
+    def _create_commit(self, cls, overwrite_partition=None):
+        commit = cls.__new__(cls)
+        commit.table = Mock()
+        commit.table.identifier = 'default.test_table'
+        commit.commit_user = 'test_user'
+        commit.overwrite_partition = overwrite_partition
+        commit.file_store_commit = Mock()
+        commit.batch_committed = False
+        return commit, commit.file_store_commit
+
+    # -- Overwrite mode: should always call overwrite(), even with empty 
messages --
+
+    @parameterized.expand([
+        ("no_messages", []),
+        ("all_empty", [False]),
+        ("non_empty", [True]),
+        ("mixed", [False, True]),
+    ])
+    def test_overwrite_forwards_filtered_messages(self, name, msg_flags):
+        """Overwrite mode should always call overwrite(), filtering out empty 
messages."""
+        commit, mock_fsc = self._create_commit(BatchTableCommit, 
overwrite_partition={'f0': 1})
+
+        messages = [
+            CommitMessage(partition=(1,), bucket=0, new_files=[Mock()] if 
has_files else [])
+            for has_files in msg_flags
+        ]
+        commit.commit(messages)
+
+        mock_fsc.overwrite.assert_called_once_with(
+            overwrite_partition={'f0': 1},
+            commit_messages=[m for m in messages if not m.is_empty()],
+            commit_identifier=BATCH_COMMIT_IDENTIFIER,
+        )
+
+    # -- Append mode: should only call commit() when there are non-empty 
messages --
+
+    @parameterized.expand([
+        ("no_messages", []),
+        ("all_empty", [False]),
+        ("non_empty", [True]),
+    ])
+    def test_append_forwards_non_empty_messages(self, name, msg_flags):
+        """Append mode should only call commit() when there are non-empty 
messages."""
+        commit, mock_fsc = self._create_commit(BatchTableCommit, 
overwrite_partition=None)
+
+        messages = [
+            CommitMessage(partition=(), bucket=0, new_files=[Mock()] if 
has_files else [])
+            for has_files in msg_flags
+        ]
+        commit.commit(messages)
+
+        if any(msg_flags):
+            mock_fsc.commit.assert_called_once_with(
+                commit_messages=[m for m in messages if not m.is_empty()],
+                commit_identifier=BATCH_COMMIT_IDENTIFIER,
+            )
+        else:
+            mock_fsc.commit.assert_not_called()
+            mock_fsc.overwrite.assert_not_called()
+
+    # -- StreamTableCommit overwrite should also reach overwrite() with empty 
messages --
+
+    def test_stream_commit_overwrite_empty_messages(self):
+        commit, mock_fsc = self._create_commit(StreamTableCommit, 
overwrite_partition={'dt': '2024-01-15'})
+
+        commit.commit([], commit_identifier=42)
+
+        mock_fsc.overwrite.assert_called_once_with(
+            overwrite_partition={'dt': '2024-01-15'},
+            commit_messages=[],
+            commit_identifier=42,
+        )
diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py 
b/paimon-python/pypaimon/write/commit/commit_scanner.py
index 300215f774..fe161c9a95 100644
--- a/paimon-python/pypaimon/write/commit/commit_scanner.py
+++ b/paimon-python/pypaimon/write/commit/commit_scanner.py
@@ -121,7 +121,10 @@ class CommitScanner:
         for partition_values in changed_partitions:
             sub_predicates = []
             for i, key in enumerate(partition_keys):
-                sub_predicates.append(predicate_builder.equal(key, 
partition_values[i]))
+                if partition_values[i] is None:
+                    sub_predicates.append(predicate_builder.is_null(key))
+                else:
+                    sub_predicates.append(predicate_builder.equal(key, 
partition_values[i]))
             
partition_predicates.append(predicate_builder.and_predicates(sub_predicates))
 
         return predicate_builder.or_predicates(partition_predicates)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 6eb7f5f5da..4c4338d620 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -22,6 +22,7 @@ import time
 import uuid
 from typing import Dict, List, Optional
 
+from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
@@ -158,37 +159,33 @@ class FileStoreCommit:
 
     def overwrite(self, overwrite_partition, commit_messages: 
List[CommitMessage], commit_identifier: int):
         """Commit the given commit messages in overwrite mode."""
-        if not commit_messages:
-            return
-
         logger.info(
             "Ready to overwrite to table %s, number of commit messages: %d",
             self.table.identifier,
             len(commit_messages),
         )
+        skip_overwrite = False
         partition_filter = None
-        # sanity check, all changes must be done within the given partition, 
meanwhile build a partition filter
-        if len(overwrite_partition) > 0:
-            predicate_builder = 
PredicateBuilder(self.table.partition_keys_fields)
-            sub_predicates = []
-            for key, value in overwrite_partition.items():
-                sub_predicates.append(predicate_builder.equal(key, value))
-            partition_filter = predicate_builder.and_predicates(sub_predicates)
 
-            for msg in commit_messages:
-                row = OffsetRow(msg.partition, 0, len(msg.partition))
-                if not partition_filter.test(row):
-                    raise RuntimeError(f"Trying to overwrite partition 
{overwrite_partition}, but the changes "
-                                       f"in {msg.partition} does not belong to 
this partition")
+        # Partition filter is built from dynamic or static partition according 
to options.
+        if len(self.table.partition_keys) > 0 and 
self.table.options.dynamic_partition_overwrite():
+            if not commit_messages:
+                # In dynamic mode, if there are no changes to commit, no data 
will be deleted
+                skip_overwrite = True
+            else:
+                partition_filter = 
self._create_dynamic_partition_filter(commit_messages)
+        else:
+            partition_filter = 
self._create_static_partition_filter(overwrite_partition, commit_messages)
 
-        self._try_commit(
-            commit_kind="OVERWRITE",
-            commit_identifier=commit_identifier,
-            commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(
-                snapshot, partition_filter, commit_messages),
-            detect_conflicts=True,
-            allow_rollback=False,
-        )
+        if not skip_overwrite:
+            self._try_commit(
+                commit_kind="OVERWRITE",
+                commit_identifier=commit_identifier,
+                commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(
+                    snapshot, partition_filter, commit_messages),
+                detect_conflicts=True,
+                allow_rollback=False,
+            )
 
     def drop_partitions(self, partitions: List[Dict[str, str]], 
commit_identifier: int) -> None:
         if not partitions:
@@ -204,12 +201,16 @@ class FileStoreCommit:
                     )
 
         predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
+        default_part_value = self.table.options.options.get(
+            CoreOptions.PARTITION_DEFAULT_NAME, "__DEFAULT_PARTITION__")
         partition_predicates = []
         for part in partitions:
-            sub_predicates = [
-                predicate_builder.equal(key, value)
-                for key, value in part.items()
-            ]
+            sub_predicates = []
+            for key, value in part.items():
+                if value is None or (isinstance(value, str) and value == 
default_part_value):
+                    sub_predicates.append(predicate_builder.is_null(key))
+                else:
+                    sub_predicates.append(predicate_builder.equal(key, value))
             if sub_predicates:
                 pred = predicate_builder.and_predicates(sub_predicates)
                 if pred is not None:
@@ -517,6 +518,45 @@ class FileStoreCommit:
                     return True
         return False
 
+    def _create_dynamic_partition_filter(self, commit_messages: 
List[CommitMessage]):
+        """Build a partition filter from the unique partitions present in 
commit_messages."""
+        predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
+        predicates = []
+        seen_partitions = set()
+        for msg in commit_messages:
+            partition_values = tuple(msg.partition)
+            if partition_values not in seen_partitions:
+                seen_partitions.add(partition_values)
+                equalities = []
+                for name, value in zip(self.table.partition_keys, 
msg.partition):
+                    if value is None:
+                        equalities.append(predicate_builder.is_null(name))
+                    else:
+                        equalities.append(predicate_builder.equal(name, value))
+                predicates.append(predicate_builder.and_predicates(equalities))
+        return predicate_builder.or_predicates(predicates)
+
+    def _create_static_partition_filter(self, overwrite_partition, 
commit_messages: List[CommitMessage]):
+        """Build a partition filter from the explicit overwrite_partition 
spec."""
+        if not overwrite_partition:
+            return None
+        predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
+        default_part_value = self.table.options.options.get(
+            CoreOptions.PARTITION_DEFAULT_NAME, "__DEFAULT_PARTITION__")
+        equalities = []
+        for key, value in overwrite_partition.items():
+            if value is None or (isinstance(value, str) and value == 
default_part_value):
+                equalities.append(predicate_builder.is_null(key))
+            else:
+                equalities.append(predicate_builder.equal(key, value))
+        partition_filter = predicate_builder.and_predicates(equalities)
+        for msg in commit_messages:
+            row = OffsetRow(msg.partition, 0, len(msg.partition))
+            if not partition_filter.test(row):
+                raise RuntimeError(f"Trying to overwrite partition 
{overwrite_partition}, but the changes "
+                                   f"in {msg.partition} does not belong to 
this partition")
+        return partition_filter
+
     def _generate_overwrite_entries(self, latest_snapshot, partition_filter, 
commit_messages):
         """Generate commit entries for OVERWRITE mode based on latest 
snapshot."""
         entries = []
diff --git a/paimon-python/pypaimon/write/table_commit.py 
b/paimon-python/pypaimon/write/table_commit.py
index 19918c1782..bafaeddf6c 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -48,20 +48,27 @@ class TableCommit:
         self._check_committed()
 
         non_empty_messages = [msg for msg in commit_messages if not 
msg.is_empty()]
-        if not non_empty_messages:
-            return
 
-        logger.info(
-            "Committing batch table %s, %d non-empty messages",
-            self.table.identifier, len(non_empty_messages)
-        )
         if self.overwrite_partition is not None:
+            # Always call overwrite() even with empty messages, so that
+            # FileStoreCommit.overwrite can handle the empty case properly
+            # (e.g. static overwrite with empty data should delete the 
partition).
+            logger.info(
+                "Committing overwrite to table %s, %d non-empty messages",
+                self.table.identifier, len(non_empty_messages)
+            )
             self.file_store_commit.overwrite(
                 overwrite_partition=self.overwrite_partition,
                 commit_messages=non_empty_messages,
                 commit_identifier=commit_identifier
             )
         else:
+            if not non_empty_messages:
+                return
+            logger.info(
+                "Committing batch table %s, %d non-empty messages",
+                self.table.identifier, len(non_empty_messages)
+            )
             self.file_store_commit.commit(
                 commit_messages=non_empty_messages,
                 commit_identifier=commit_identifier

Reply via email to