This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 827e1815af [python] Support FileStoreCommit overwrite dynamic
partition (#7749)
827e1815af is described below
commit 827e1815af04d0143a8327f7e8b4eaf0648b24f0
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sat May 9 16:59:14 2026 +0800
[python] Support FileStoreCommit overwrite dynamic partition (#7749)
Support dynamic partition overwrite in `FileStoreCommit.overwrite()`,
aligned with Java `FileStoreCommitImpl.overwritePartition()`:
- When `dynamic-partition-overwrite=true` (default), only partitions
present in the committed data are replaced — other partitions remain
untouched.
- When no data is committed, the overwrite is skipped entirely. When
`dynamic-partition-overwrite=false`, existing static overwrite behavior
is unchanged.
Key Changes:
- Add `DYNAMIC_PARTITION_OVERWRITE` config option and accessor in
`CoreOptions`.
- Restructure `overwrite()` with `skip_overwrite` / `partition_filter`
control flow mirroring Java's `skipOverwrite` / `partitionFilter`.
- Add `_create_dynamic_partition_filter()` mirroring
`PartitionPredicate.fromMultiple`.
- Add `_create_static_partition_filter()` mirroring
`createPartitionPredicate` + `fromPredicate`.
---
.../pypaimon/common/options/core_options.py | 13 +++
.../pypaimon/tests/partition_predicate_test.py | 100 +++++++++++++++++++-
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 65 +++++++++++++
paimon-python/pypaimon/tests/reader_base_test.py | 65 +++++++++++++
.../pypaimon/tests/rest/rest_read_write_test.py | 65 +++++++++++++
paimon-python/pypaimon/tests/table_commit_test.py | 102 +++++++++++++++++++++
.../pypaimon/write/commit/commit_scanner.py | 5 +-
paimon-python/pypaimon/write/file_store_commit.py | 94 +++++++++++++------
paimon-python/pypaimon/write/table_commit.py | 19 ++--
9 files changed, 492 insertions(+), 36 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 6a4b51c8c3..3fe2e79455 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -442,6 +442,16 @@ class CoreOptions:
)
)
+ DYNAMIC_PARTITION_OVERWRITE: ConfigOption[bool] = (
+ ConfigOptions.key("dynamic-partition-overwrite")
+ .boolean_type()
+ .default_value(True)
+ .with_description(
+ "Whether only overwrite dynamic partition when overwriting a
partitioned table "
+ "with dynamic partition columns. Works only when the table has
partition keys."
+ )
+ )
+
def __init__(self, options: Options):
self.options = options
@@ -622,3 +632,6 @@ class CoreOptions:
def add_column_before_partition(self) -> bool:
return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)
+
+ def dynamic_partition_overwrite(self) -> bool:
+ return self.options.get(CoreOptions.DYNAMIC_PARTITION_OVERWRITE)
diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py
b/paimon-python/pypaimon/tests/partition_predicate_test.py
index 1dbc9a150f..f4fe38f767 100644
--- a/paimon-python/pypaimon/tests/partition_predicate_test.py
+++ b/paimon-python/pypaimon/tests/partition_predicate_test.py
@@ -52,6 +52,7 @@ def _mock_table():
table.fields = TABLE_FIELDS
table.partition_keys = ['dt', 'region']
table.partition_keys_fields = PARTITION_FIELDS
+ table.options.options.get = Mock(return_value="__DEFAULT_PARTITION__")
return table
@@ -181,9 +182,9 @@ class TestOverwritePartitionPredicate(unittest.TestCase):
return mock_cls.call_args[1]['partition_predicate']
def test_overwrite_rejects_mismatched_partition(self, *_):
- commit = self._create_commit(stub_commit=False)
+ commit = self._create_commit()
with self.assertRaises(RuntimeError) as ctx:
- commit.overwrite(self._TARGET, [self._msg(('2024-01-15',
'us-west-2'))], 1)
+ commit._create_static_partition_filter(self._TARGET,
[self._msg(('2024-01-15', 'us-west-2'))])
self.assertIn('does not belong to this partition', str(ctx.exception))
def test_overwrite_passes_partition_scoped_predicate(self, *_):
@@ -206,6 +207,89 @@ class TestOverwritePartitionPredicate(unittest.TestCase):
self.assertTrue(pred.test(OffsetRow(('2024-01-16', 'us-west-2'), 0,
2)))
self.assertFalse(pred.test(OffsetRow(('2024-01-17', 'eu-west-1'), 0,
2)))
+ def test_overwrite_null_partition_value(self, *_):
+ """Test that overwrite with None partition value uses isNull
predicate."""
+ commit = self._create_commit()
+ target = {'dt': None, 'region': 'us-east-1'}
+ commit.overwrite(target, [self._msg((None, 'us-east-1'))], 1)
+
+ pred = self._extract_partition_predicate(commit)
+ # Should match rows where dt is None and region is 'us-east-1'
+ self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+ # Should not match rows where dt has a value
+ self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0,
2)))
+ # Should not match rows where region differs
+ self.assertFalse(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))
+
+ def test_overwrite_default_partition_name_treated_as_null(self, *_):
+ """Test that overwrite with default partition name string is treated
as null."""
+ commit = self._create_commit()
+ target = {'dt': '__DEFAULT_PARTITION__', 'region': 'us-east-1'}
+ commit.overwrite(target, [self._msg((None, 'us-east-1'))], 1)
+
+ pred = self._extract_partition_predicate(commit)
+ # __DEFAULT_PARTITION__ should be treated like None (isNull)
+ self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+ self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0,
2)))
+
+ def test_overwrite_all_null_partition_values(self, *_):
+ """Test overwrite where all partition values are None."""
+ commit = self._create_commit()
+ target = {'dt': None, 'region': None}
+ commit.overwrite(target, [self._msg((None, None))], 1)
+
+ pred = self._extract_partition_predicate(commit)
+ self.assertTrue(pred.test(OffsetRow((None, None), 0, 2)))
+ self.assertFalse(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+ self.assertFalse(pred.test(OffsetRow(('2024-01-15', None), 0, 2)))
+
+ def test_overwrite_null_partition_rejects_mismatched(self, *_):
+ """Test that overwrite with null partition rejects rows that don't
match."""
+ commit = self._create_commit()
+ target = {'dt': None, 'region': 'us-east-1'}
+ # Trying to overwrite null dt partition with data that has a non-null
dt
+ with self.assertRaises(RuntimeError) as ctx:
+ commit._create_static_partition_filter(target,
[self._msg(('2024-01-15', 'us-east-1'))])
+ self.assertIn('does not belong to this partition', str(ctx.exception))
+
+ def test_dynamic_overwrite_null_partition_value(self, *_):
+ """Test dynamic partition overwrite with None partition values."""
+ commit = self._create_commit()
+ self.table.options.dynamic_partition_overwrite.return_value = True
+ commit.overwrite({}, [self._msg((None, 'us-east-1'))], 1)
+
+ pred = self._extract_partition_predicate(commit)
+ self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+ self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0,
2)))
+
+ def test_dynamic_overwrite_mixed_null_and_nonnull(self, *_):
+ """Test dynamic partition overwrite with both null and non-null
partitions."""
+ commit = self._create_commit()
+ self.table.options.dynamic_partition_overwrite.return_value = True
+ commit.overwrite({}, [
+ self._msg(('2024-01-15', 'us-east-1')),
+ self._msg((None, 'us-west-2')),
+ ], 1)
+
+ pred = self._extract_partition_predicate(commit)
+ self.assertTrue(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0,
2)))
+ self.assertTrue(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))
+ self.assertFalse(pred.test(OffsetRow(('2024-01-16', 'eu-west-1'), 0,
2)))
+
+ def test_drop_partitions_null_partition_value(self, *_):
+ """Test drop_partitions with default partition name string
representing null."""
+ commit = self._create_commit()
+ commit.drop_partitions([
+ {'dt': '__DEFAULT_PARTITION__', 'region': 'us-east-1'},
+ {'dt': '2024-01-16', 'region': 'us-west-2'},
+ ], 1)
+
+ pred = self._extract_partition_predicate(commit)
+ self.assertTrue(pred.test(OffsetRow((None, 'us-east-1'), 0, 2)))
+ self.assertTrue(pred.test(OffsetRow(('2024-01-16', 'us-west-2'), 0,
2)))
+ self.assertFalse(pred.test(OffsetRow(('2024-01-15', 'us-east-1'), 0,
2)))
+ self.assertFalse(pred.test(OffsetRow((None, 'us-west-2'), 0, 2)))
+
class TestCommitScannerPartitionPredicate(unittest.TestCase):
@@ -223,6 +307,18 @@ class
TestCommitScannerPartitionPredicate(unittest.TestCase):
self.assertTrue(pred.test(GenericRow(['2024-01-16', 'us-west-2'],
PARTITION_FIELDS)))
self.assertFalse(pred.test(GenericRow(['2024-01-17', 'eu-west-1'],
PARTITION_FIELDS)))
+ def test_filter_handles_null_partition_values(self):
+ scanner = self._scanner()
+ pred = scanner._build_partition_filter_from_entries([
+ _manifest_entry([None, 'us-east-1']),
+ _manifest_entry(['2024-01-16', 'us-west-2']),
+ ])
+
+ self.assertTrue(pred.test(GenericRow([None, 'us-east-1'],
PARTITION_FIELDS)))
+ self.assertTrue(pred.test(GenericRow(['2024-01-16', 'us-west-2'],
PARTITION_FIELDS)))
+ self.assertFalse(pred.test(GenericRow(['2024-01-15', 'us-east-1'],
PARTITION_FIELDS)))
+ self.assertFalse(pred.test(GenericRow([None, 'us-west-2'],
PARTITION_FIELDS)))
+
def test_filter_none_without_partition_keys(self):
scanner = CommitScanner(Mock(partition_keys=[]), Mock())
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 5258aec1d0..2a39628136 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -128,6 +128,71 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+ def test_dynamic_partition_overwrite(self):
+ pa_schema = pa.schema([
+ ('f0', pa.string()),
+ ('f1', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
+ self.rest_catalog.create_table('default.test_dynamic_overwrite',
schema, False)
+ table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
+ read_builder = table.new_read_builder()
+
+ # Write initial non-null and null partitions
+ self._batch_write(table, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['apple', 'banana', 'cherry'],
+ }))
+
+ # Dynamic overwrite partition f0='a' only; 'b' and null untouched
+ self._batch_overwrite(table, pd.DataFrame({
+ 'f0': ['a'],
+ 'f1': ['watermelon'],
+ }))
+
+ self._assert_table_equals(read_builder, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['watermelon', 'banana', 'cherry'],
+ }), sort_by='f0')
+
+ # Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
+ self._batch_overwrite(table, pd.DataFrame({
+ 'f0': ['a', None],
+ 'f1': ['mango', 'grape'],
+ }))
+
+ self._assert_table_equals(read_builder, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['mango', 'banana', 'grape'],
+ }), sort_by='f0')
+
+ def _batch_write(self, table, df):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_pandas(df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ def _batch_overwrite(self, table, df, partition=None):
+ write_builder = table.new_batch_write_builder().overwrite(partition)
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_pandas(df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df = table_read.to_pandas(table_scan.plan().splits())
+ if sort_by:
+ actual_df = actual_df.sort_values(by=sort_by)
+ pd.testing.assert_frame_equal(
+ actual_df.reset_index(drop=True),
expected_df.reset_index(drop=True))
+
def test_full_data_types(self):
simple_pa_schema = pa.schema([
('f0', pa.int8()),
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index f1b147d147..e64d4ab35c 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -162,6 +162,71 @@ class ReaderBasicTest(unittest.TestCase):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+ def test_dynamic_partition_overwrite(self):
+ pa_schema = pa.schema([
+ ('f0', pa.string()),
+ ('f1', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
+ self.catalog.create_table('default.test_dynamic_overwrite', schema,
False)
+ table = self.catalog.get_table('default.test_dynamic_overwrite')
+ read_builder = table.new_read_builder()
+
+ # Write initial non-null and null partitions
+ self._batch_write(table, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['apple', 'banana', 'cherry'],
+ }))
+
+ # Dynamic overwrite partition f0='a' only; 'b' and null untouched
+ self._batch_overwrite(table, pd.DataFrame({
+ 'f0': ['a'],
+ 'f1': ['watermelon'],
+ }))
+
+ self._assert_table_equals(read_builder, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['watermelon', 'banana', 'cherry'],
+ }), sort_by='f0')
+
+ # Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
+ self._batch_overwrite(table, pd.DataFrame({
+ 'f0': ['a', None],
+ 'f1': ['mango', 'grape'],
+ }))
+
+ self._assert_table_equals(read_builder, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['mango', 'banana', 'grape'],
+ }), sort_by='f0')
+
+ def _batch_write(self, table, df):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_pandas(df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ def _batch_overwrite(self, table, df, partition=None):
+ write_builder = table.new_batch_write_builder().overwrite(partition)
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_pandas(df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df = table_read.to_pandas(table_scan.plan().splits())
+ if sort_by:
+ actual_df = actual_df.sort_values(by=sort_by)
+ pd.testing.assert_frame_equal(
+ actual_df.reset_index(drop=True),
expected_df.reset_index(drop=True))
+
def test_full_data_types(self):
simple_pa_schema = pa.schema([
('f0', pa.int8()),
diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
index 455cc6cdf9..ce4fabda19 100644
--- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
@@ -116,6 +116,71 @@ class RESTTableReadWriteTest(RESTBaseTest):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+ def test_dynamic_partition_overwrite(self):
+ pa_schema = pa.schema([
+ ('f0', pa.string()),
+ ('f1', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['f0'])
+ self.rest_catalog.create_table('default.test_dynamic_overwrite',
schema, False)
+ table = self.rest_catalog.get_table('default.test_dynamic_overwrite')
+ read_builder = table.new_read_builder()
+
+ # Write initial non-null and null partitions
+ self._batch_write(table, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['apple', 'banana', 'cherry'],
+ }))
+
+ # Dynamic overwrite partition f0='a' only; 'b' and null untouched
+ self._batch_overwrite(table, pd.DataFrame({
+ 'f0': ['a'],
+ 'f1': ['watermelon'],
+ }))
+
+ self._assert_table_equals(read_builder, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['watermelon', 'banana', 'cherry'],
+ }), sort_by='f0')
+
+ # Dynamic overwrite partitions f0='a' and f0=None; 'b' untouched
+ self._batch_overwrite(table, pd.DataFrame({
+ 'f0': ['a', None],
+ 'f1': ['mango', 'grape'],
+ }))
+
+ self._assert_table_equals(read_builder, pd.DataFrame({
+ 'f0': ['a', 'b', None],
+ 'f1': ['mango', 'banana', 'grape'],
+ }), sort_by='f0')
+
+ def _batch_write(self, table, df):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_pandas(df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ def _batch_overwrite(self, table, df, partition=None):
+ write_builder = table.new_batch_write_builder().overwrite(partition)
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_pandas(df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ def _assert_table_equals(self, read_builder, expected_df, sort_by=None):
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df = table_read.to_pandas(table_scan.plan().splits())
+ if sort_by:
+ actual_df = actual_df.sort_values(by=sort_by)
+ pd.testing.assert_frame_equal(
+ actual_df.reset_index(drop=True),
expected_df.reset_index(drop=True))
+
def test_parquet_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
diff --git a/paimon-python/pypaimon/tests/table_commit_test.py
b/paimon-python/pypaimon/tests/table_commit_test.py
new file mode 100644
index 0000000000..87fb6a50dd
--- /dev/null
+++ b/paimon-python/pypaimon/tests/table_commit_test.py
@@ -0,0 +1,102 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import unittest
+from unittest.mock import Mock
+
+from parameterized import parameterized
+
+from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit
+
+
+class TestTableCommitEmptyOverwrite(unittest.TestCase):
+ """Tests for TableCommit._commit handling of empty commit messages in
overwrite mode."""
+
+ def _create_commit(self, cls, overwrite_partition=None):
+ commit = cls.__new__(cls)
+ commit.table = Mock()
+ commit.table.identifier = 'default.test_table'
+ commit.commit_user = 'test_user'
+ commit.overwrite_partition = overwrite_partition
+ commit.file_store_commit = Mock()
+ commit.batch_committed = False
+ return commit, commit.file_store_commit
+
+ # -- Overwrite mode: should always call overwrite(), even with empty
messages --
+
+ @parameterized.expand([
+ ("no_messages", []),
+ ("all_empty", [False]),
+ ("non_empty", [True]),
+ ("mixed", [False, True]),
+ ])
+ def test_overwrite_forwards_filtered_messages(self, name, msg_flags):
+ """Overwrite mode should always call overwrite(), filtering out empty
messages."""
+ commit, mock_fsc = self._create_commit(BatchTableCommit,
overwrite_partition={'f0': 1})
+
+ messages = [
+ CommitMessage(partition=(1,), bucket=0, new_files=[Mock()] if
has_files else [])
+ for has_files in msg_flags
+ ]
+ commit.commit(messages)
+
+ mock_fsc.overwrite.assert_called_once_with(
+ overwrite_partition={'f0': 1},
+ commit_messages=[m for m in messages if not m.is_empty()],
+ commit_identifier=BATCH_COMMIT_IDENTIFIER,
+ )
+
+ # -- Append mode: should only call commit() when there are non-empty
messages --
+
+ @parameterized.expand([
+ ("no_messages", []),
+ ("all_empty", [False]),
+ ("non_empty", [True]),
+ ])
+ def test_append_forwards_non_empty_messages(self, name, msg_flags):
+ """Append mode should only call commit() when there are non-empty
messages."""
+ commit, mock_fsc = self._create_commit(BatchTableCommit,
overwrite_partition=None)
+
+ messages = [
+ CommitMessage(partition=(), bucket=0, new_files=[Mock()] if
has_files else [])
+ for has_files in msg_flags
+ ]
+ commit.commit(messages)
+
+ if any(msg_flags):
+ mock_fsc.commit.assert_called_once_with(
+ commit_messages=[m for m in messages if not m.is_empty()],
+ commit_identifier=BATCH_COMMIT_IDENTIFIER,
+ )
+ else:
+ mock_fsc.commit.assert_not_called()
+ mock_fsc.overwrite.assert_not_called()
+
+ # -- StreamTableCommit overwrite should also reach overwrite() with empty
messages --
+
+ def test_stream_commit_overwrite_empty_messages(self):
+ commit, mock_fsc = self._create_commit(StreamTableCommit,
overwrite_partition={'dt': '2024-01-15'})
+
+ commit.commit([], commit_identifier=42)
+
+ mock_fsc.overwrite.assert_called_once_with(
+ overwrite_partition={'dt': '2024-01-15'},
+ commit_messages=[],
+ commit_identifier=42,
+ )
diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py
b/paimon-python/pypaimon/write/commit/commit_scanner.py
index 300215f774..fe161c9a95 100644
--- a/paimon-python/pypaimon/write/commit/commit_scanner.py
+++ b/paimon-python/pypaimon/write/commit/commit_scanner.py
@@ -121,7 +121,10 @@ class CommitScanner:
for partition_values in changed_partitions:
sub_predicates = []
for i, key in enumerate(partition_keys):
- sub_predicates.append(predicate_builder.equal(key,
partition_values[i]))
+ if partition_values[i] is None:
+ sub_predicates.append(predicate_builder.is_null(key))
+ else:
+ sub_predicates.append(predicate_builder.equal(key,
partition_values[i]))
partition_predicates.append(predicate_builder.and_predicates(sub_predicates))
return predicate_builder.or_predicates(partition_predicates)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 6eb7f5f5da..4c4338d620 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -22,6 +22,7 @@ import time
import uuid
from typing import Dict, List, Optional
+from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
@@ -158,37 +159,33 @@ class FileStoreCommit:
def overwrite(self, overwrite_partition, commit_messages:
List[CommitMessage], commit_identifier: int):
"""Commit the given commit messages in overwrite mode."""
- if not commit_messages:
- return
-
logger.info(
"Ready to overwrite to table %s, number of commit messages: %d",
self.table.identifier,
len(commit_messages),
)
+ skip_overwrite = False
partition_filter = None
- # sanity check, all changes must be done within the given partition,
meanwhile build a partition filter
- if len(overwrite_partition) > 0:
- predicate_builder =
PredicateBuilder(self.table.partition_keys_fields)
- sub_predicates = []
- for key, value in overwrite_partition.items():
- sub_predicates.append(predicate_builder.equal(key, value))
- partition_filter = predicate_builder.and_predicates(sub_predicates)
- for msg in commit_messages:
- row = OffsetRow(msg.partition, 0, len(msg.partition))
- if not partition_filter.test(row):
- raise RuntimeError(f"Trying to overwrite partition
{overwrite_partition}, but the changes "
- f"in {msg.partition} does not belong to
this partition")
+ # Partition filter is built from dynamic or static partition according
to options.
+ if len(self.table.partition_keys) > 0 and
self.table.options.dynamic_partition_overwrite():
+ if not commit_messages:
+ # In dynamic mode, if there are no changes to commit, no data
will be deleted
+ skip_overwrite = True
+ else:
+ partition_filter =
self._create_dynamic_partition_filter(commit_messages)
+ else:
+ partition_filter =
self._create_static_partition_filter(overwrite_partition, commit_messages)
- self._try_commit(
- commit_kind="OVERWRITE",
- commit_identifier=commit_identifier,
- commit_entries_plan=lambda snapshot:
self._generate_overwrite_entries(
- snapshot, partition_filter, commit_messages),
- detect_conflicts=True,
- allow_rollback=False,
- )
+ if not skip_overwrite:
+ self._try_commit(
+ commit_kind="OVERWRITE",
+ commit_identifier=commit_identifier,
+ commit_entries_plan=lambda snapshot:
self._generate_overwrite_entries(
+ snapshot, partition_filter, commit_messages),
+ detect_conflicts=True,
+ allow_rollback=False,
+ )
def drop_partitions(self, partitions: List[Dict[str, str]],
commit_identifier: int) -> None:
if not partitions:
@@ -204,12 +201,16 @@ class FileStoreCommit:
)
predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
+ default_part_value = self.table.options.options.get(
+ CoreOptions.PARTITION_DEFAULT_NAME, "__DEFAULT_PARTITION__")
partition_predicates = []
for part in partitions:
- sub_predicates = [
- predicate_builder.equal(key, value)
- for key, value in part.items()
- ]
+ sub_predicates = []
+ for key, value in part.items():
+ if value is None or (isinstance(value, str) and value ==
default_part_value):
+ sub_predicates.append(predicate_builder.is_null(key))
+ else:
+ sub_predicates.append(predicate_builder.equal(key, value))
if sub_predicates:
pred = predicate_builder.and_predicates(sub_predicates)
if pred is not None:
@@ -517,6 +518,45 @@ class FileStoreCommit:
return True
return False
+ def _create_dynamic_partition_filter(self, commit_messages:
List[CommitMessage]):
+ """Build a partition filter from the unique partitions present in
commit_messages."""
+ predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
+ predicates = []
+ seen_partitions = set()
+ for msg in commit_messages:
+ partition_values = tuple(msg.partition)
+ if partition_values not in seen_partitions:
+ seen_partitions.add(partition_values)
+ equalities = []
+ for name, value in zip(self.table.partition_keys,
msg.partition):
+ if value is None:
+ equalities.append(predicate_builder.is_null(name))
+ else:
+ equalities.append(predicate_builder.equal(name, value))
+ predicates.append(predicate_builder.and_predicates(equalities))
+ return predicate_builder.or_predicates(predicates)
+
+ def _create_static_partition_filter(self, overwrite_partition,
commit_messages: List[CommitMessage]):
+ """Build a partition filter from the explicit overwrite_partition
spec."""
+ if not overwrite_partition:
+ return None
+ predicate_builder = PredicateBuilder(self.table.partition_keys_fields)
+ default_part_value = self.table.options.options.get(
+ CoreOptions.PARTITION_DEFAULT_NAME, "__DEFAULT_PARTITION__")
+ equalities = []
+ for key, value in overwrite_partition.items():
+ if value is None or (isinstance(value, str) and value ==
default_part_value):
+ equalities.append(predicate_builder.is_null(key))
+ else:
+ equalities.append(predicate_builder.equal(key, value))
+ partition_filter = predicate_builder.and_predicates(equalities)
+ for msg in commit_messages:
+ row = OffsetRow(msg.partition, 0, len(msg.partition))
+ if not partition_filter.test(row):
+ raise RuntimeError(f"Trying to overwrite partition
{overwrite_partition}, but the changes "
+ f"in {msg.partition} does not belong to
this partition")
+ return partition_filter
+
def _generate_overwrite_entries(self, latest_snapshot, partition_filter,
commit_messages):
"""Generate commit entries for OVERWRITE mode based on latest
snapshot."""
entries = []
diff --git a/paimon-python/pypaimon/write/table_commit.py
b/paimon-python/pypaimon/write/table_commit.py
index 19918c1782..bafaeddf6c 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -48,20 +48,27 @@ class TableCommit:
self._check_committed()
non_empty_messages = [msg for msg in commit_messages if not
msg.is_empty()]
- if not non_empty_messages:
- return
- logger.info(
- "Committing batch table %s, %d non-empty messages",
- self.table.identifier, len(non_empty_messages)
- )
if self.overwrite_partition is not None:
+ # Always call overwrite() even with empty messages, so that
+ # FileStoreCommit.overwrite can handle the empty case properly
+ # (e.g. static overwrite with empty data should delete the
partition).
+ logger.info(
+ "Committing overwrite to table %s, %d non-empty messages",
+ self.table.identifier, len(non_empty_messages)
+ )
self.file_store_commit.overwrite(
overwrite_partition=self.overwrite_partition,
commit_messages=non_empty_messages,
commit_identifier=commit_identifier
)
else:
+ if not non_empty_messages:
+ return
+ logger.info(
+ "Committing batch table %s, %d non-empty messages",
+ self.table.identifier, len(non_empty_messages)
+ )
self.file_store_commit.commit(
commit_messages=non_empty_messages,
commit_identifier=commit_identifier