This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new df4b475ec6 [python] Generate input changelogs from Python writer
(#7739)
df4b475ec6 is described below
commit df4b475ec6cbc7ede22b61c84edc79103c56c584
Author: junmuz <[email protected]>
AuthorDate: Sat Jun 6 09:36:29 2026 +0100
[python] Generate input changelogs from Python writer (#7739)
---
.../pypaimon/common/options/core_options.py | 11 +
paimon-python/pypaimon/schema/schema.py | 7 +
.../pypaimon/tests/file_store_commit_test.py | 1 +
.../pypaimon/tests/reader_append_only_test.py | 1 +
.../tests/write/changelog_producer_test.py | 375 +++++++++++++++++++++
paimon-python/pypaimon/write/commit_message.py | 3 +-
paimon-python/pypaimon/write/file_store_commit.py | 73 +++-
paimon-python/pypaimon/write/file_store_write.py | 14 +-
paimon-python/pypaimon/write/writer/data_writer.py | 91 +++--
.../write/writer/dedicated_format_writer.py | 8 +-
.../pypaimon/write/writer/key_value_data_writer.py | 6 +-
11 files changed, 552 insertions(+), 38 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 874b888faf..e84ea6c4e1 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -470,6 +470,14 @@ class CoreOptions:
"Options: none, input, full-compaction, lookup.")
)
+ CHANGELOG_FILE_FORMAT: ConfigOption[str] = (
+ ConfigOptions.key("changelog-file.format")
+ .string_type()
+ .no_default_value()
+ .with_description("Specify the file format of changelog files. "
+ "Currently parquet, avro and orc are supported.")
+ )
+
MERGE_ENGINE: ConfigOption[MergeEngine] = (
ConfigOptions.key("merge-engine")
.enum_type(MergeEngine)
@@ -959,6 +967,9 @@ class CoreOptions:
def changelog_producer(self, default=None):
return self.options.get(CoreOptions.CHANGELOG_PRODUCER, default)
+ def changelog_file_format(self, default=None):
+ return self.options.get(CoreOptions.CHANGELOG_FILE_FORMAT, default)
+
def merge_engine(self, default=None):
return self.options.get(CoreOptions.MERGE_ENGINE, default)
diff --git a/paimon-python/pypaimon/schema/schema.py
b/paimon-python/pypaimon/schema/schema.py
index f3a63c88e1..3354afebc8 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -48,6 +48,13 @@ class Schema:
self.options = options if options is not None else {}
self.comment = comment
+ changelog_producer =
self.options.get(CoreOptions.CHANGELOG_PRODUCER.key(), 'none')
+ if changelog_producer != 'none' and not self.primary_keys:
+ raise ValueError(
+ f"Cannot set 'changelog-producer' to '{changelog_producer}' on
a table without primary keys. "
+ f"Changelog producer requires primary keys to be defined."
+ )
+
@staticmethod
def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys:
Optional[List[str]] = None,
primary_keys: Optional[List[str]] = None, options:
Optional[Dict] = None,
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index ec13d4041a..8b6b892bae 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -435,6 +435,7 @@ class TestFileStoreCommit(unittest.TestCase):
retry_result=None,
commit_kind="APPEND",
commit_entries=[commit_entry],
+ changelog_entries=[],
commit_identifier=11,
latest_snapshot=latest_snapshot
)
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 68d268d655..34509db74a 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -635,6 +635,7 @@ class AoReaderTest(unittest.TestCase):
RetryResult(None),
"APPEND",
commit_entries,
+ [],
BATCH_COMMIT_IDENTIFIER,
latest_snapshot)
self.assertTrue(success.is_success())
diff --git a/paimon-python/pypaimon/tests/write/changelog_producer_test.py
b/paimon-python/pypaimon/tests/write/changelog_producer_test.py
new file mode 100644
index 0000000000..b789593038
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/changelog_producer_test.py
@@ -0,0 +1,375 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import glob
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+import json
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.schema.data_types import DataField, AtomicType
+from pypaimon.write.commit_message import CommitMessage
+
+
+class ChangelogProducerTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+ cls.pk_schema = pa.schema([
+ pa.field('user_id', pa.int32(), nullable=False),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ pa.field('dt', pa.string(), nullable=False)
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_table(self, table_name, options=None):
+ schema = Schema.from_pyarrow_schema(
+ self.pk_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options=options or {}
+ )
+ self.catalog.create_table(f'default.{table_name}', schema, False)
+ return self.catalog.get_table(f'default.{table_name}')
+
+ def _sample_data(self):
+ return pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [101, 102, 103],
+ 'behavior': ['click', 'buy', 'view'],
+ 'dt': ['p1', 'p1', 'p1']
+ }, schema=self.pk_schema)
+
+ def test_commit_message_with_changelog(self):
+ msg = CommitMessage(partition=('p1',), bucket=0, new_files=[],
changelog_files=[])
+ self.assertTrue(msg.is_empty())
+
+ msg2 = CommitMessage(partition=('p1',), bucket=0, new_files=['fake'])
+ self.assertFalse(msg2.is_empty())
+ self.assertEqual(msg2.changelog_files, [])
+
+ def test_full_compaction_and_lookup_no_changelog_from_writer(self):
+ """FULL_COMPACTION and LOOKUP rely on dedicated compaction for
changelog,
+ so the Python writer should not produce changelog files for these
modes."""
+ for mode in ['full-compaction', 'lookup']:
+ table = self._create_table(
+ f'test_no_changelog_{mode.replace("-", "_")}',
+ options={'changelog-producer': mode, 'bucket': '1'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ table_commit.commit(table_write.prepare_commit())
+
+ bucket_dir = os.path.join(
+ self.warehouse, 'default.db',
+ f'test_no_changelog_{mode.replace("-", "_")}', 'dt=p1',
'bucket-0')
+ changelog_files = glob.glob(os.path.join(bucket_dir,
'changelog-*'))
+ self.assertEqual(len(changelog_files), 0,
+ f"Writer should not produce changelog files for
{mode}")
+
+ table_write.close()
+ table_commit.close()
+
+ def test_none_mode_no_changelog(self):
+ table = self._create_table(
+ 'test_none_mode',
+ options={'changelog-producer': 'none', 'bucket': '1'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ table_commit.commit(table_write.prepare_commit())
+
+ snapshot_path = glob.glob(
+ os.path.join(self.warehouse, 'default.db', 'test_none_mode',
'snapshot', 'snapshot-*'))
+ self.assertTrue(len(snapshot_path) > 0)
+
+ snapshot_json = open(snapshot_path[0]).read()
+ snapshot = json.loads(snapshot_json)
+ self.assertNotIn('changelogManifestList', snapshot)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_input_mode_produces_changelog_files(self):
+ table = self._create_table(
+ 'test_input_files',
+ options={'changelog-producer': 'input', 'bucket': '1'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ table_commit.commit(table_write.prepare_commit())
+
+ bucket_dir = os.path.join(
+ self.warehouse, 'default.db', 'test_input_files', 'dt=p1',
'bucket-0')
+ data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
+ changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
+ self.assertTrue(len(data_files) > 0, "Should have data files")
+ self.assertTrue(len(changelog_files) > 0, "Should have changelog
files")
+
+ table_write.close()
+ table_commit.close()
+
+ def test_input_mode_snapshot_has_changelog_manifest(self):
+ table = self._create_table(
+ 'test_input_snapshot',
+ options={'changelog-producer': 'input', 'bucket': '1'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ table_commit.commit(table_write.prepare_commit())
+
+ snapshot_path = glob.glob(
+ os.path.join(self.warehouse, 'default.db', 'test_input_snapshot',
+ 'snapshot', 'snapshot-*'))
+ self.assertTrue(len(snapshot_path) > 0)
+
+ snapshot_json = open(snapshot_path[0]).read()
+ snapshot = json.loads(snapshot_json)
+ self.assertIn('changelogManifestList', snapshot)
+ self.assertIsNotNone(snapshot['changelogManifestList'])
+ self.assertIn('changelogRecordCount', snapshot)
+ self.assertEqual(snapshot['changelogRecordCount'], 3)
+ self.assertIn('changelogManifestListSize', snapshot)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_input_mode_changelog_manifest_readable(self):
+ table = self._create_table(
+ 'test_input_readable',
+ options={'changelog-producer': 'input', 'bucket': '1'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ table_commit.commit(table_write.prepare_commit())
+
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+ snapshot_manager = SnapshotManager(table.file_io, table.table_path)
+ snapshot = snapshot_manager.get_latest_snapshot()
+
+ self.assertIsNotNone(snapshot.changelog_manifest_list)
+
+ manifest_list_manager = ManifestListManager(table)
+ changelog_manifests = manifest_list_manager.read_changelog(snapshot)
+ self.assertTrue(len(changelog_manifests) > 0)
+
+ total_changelog_added = sum(m.num_added_files for m in
changelog_manifests)
+ self.assertTrue(total_changelog_added > 0)
+
+ table_write.close()
+ table_commit.close()
+
+ def test_input_mode_multiple_commits(self):
+ table = self._create_table(
+ 'test_input_multi',
+ options={'changelog-producer': 'input', 'bucket': '1'}
+ )
+
+ # First commit
+ write_builder1 = table.new_batch_write_builder()
+ table_write1 = write_builder1.new_write()
+ table_commit1 = write_builder1.new_commit()
+ table_write1.write_arrow(self._sample_data())
+ table_commit1.commit(table_write1.prepare_commit())
+ table_write1.close()
+ table_commit1.close()
+
+ # Second commit with different data
+ write_builder2 = table.new_batch_write_builder()
+ table_write2 = write_builder2.new_write()
+ table_commit2 = write_builder2.new_commit()
+ data2 = pa.Table.from_pydict({
+ 'user_id': [4, 5],
+ 'item_id': [104, 105],
+ 'behavior': ['click', 'buy'],
+ 'dt': ['p1', 'p1']
+ }, schema=self.pk_schema)
+ table_write2.write_arrow(data2)
+ table_commit2.commit(table_write2.prepare_commit())
+ table_write2.close()
+ table_commit2.close()
+
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+ snapshot_manager = SnapshotManager(table.file_io, table.table_path)
+ snapshot = snapshot_manager.get_latest_snapshot()
+ self.assertEqual(snapshot.id, 2)
+ self.assertIsNotNone(snapshot.changelog_manifest_list)
+ self.assertEqual(snapshot.changelog_record_count, 2)
+
+ def test_abort_cleans_up_changelog_files(self):
+ table = self._create_table(
+ 'test_input_abort',
+ options={'changelog-producer': 'input', 'bucket': '1'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ commit_messages = table_write.prepare_commit()
+
+ bucket_dir = os.path.join(
+ self.warehouse, 'default.db', 'test_input_abort', 'dt=p1',
'bucket-0')
+ changelog_files_before = glob.glob(os.path.join(bucket_dir,
'changelog-*'))
+ self.assertTrue(len(changelog_files_before) > 0)
+
+ table_commit.abort(commit_messages)
+
+ data_files_after = glob.glob(os.path.join(bucket_dir, 'data-*'))
+ changelog_files_after = glob.glob(os.path.join(bucket_dir,
'changelog-*'))
+ self.assertEqual(len(data_files_after), 0, "Data files should be
cleaned up after abort")
+ self.assertEqual(len(changelog_files_after), 0, "Changelog files
should be cleaned up after abort")
+
+ table_write.close()
+ table_commit.close()
+
+ def test_reject_changelog_producer_on_append_only_table(self):
+ append_schema = pa.schema([
+ ('user_id', pa.int32()),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ ('dt', pa.string())
+ ])
+ for mode in ['input', 'full-compaction', 'lookup']:
+ with self.assertRaises(ValueError, msg=f"Should reject
changelog-producer={mode} without PKs"):
+ Schema.from_pyarrow_schema(
+ append_schema,
+ partition_keys=['dt'],
+ options={'changelog-producer': mode, 'bucket': '1'}
+ )
+
+ def test_reject_changelog_producer_on_direct_schema_construction(self):
+ for mode in ['input', 'full-compaction', 'lookup']:
+ with self.assertRaises(ValueError, msg=f"Should reject
changelog-producer={mode} without PKs"):
+ Schema(
+ fields=[
+ DataField(0, 'user_id', AtomicType('INT')),
+ DataField(1, 'item_id', AtomicType('BIGINT')),
+ DataField(2, 'behavior', AtomicType('STRING')),
+ DataField(3, 'dt', AtomicType('STRING')),
+ ],
+ partition_keys=['dt'],
+ primary_keys=[],
+ options={'changelog-producer': mode, 'bucket': '1'}
+ )
+
+ def test_changelog_producer_none_allowed_on_append_only_table(self):
+ append_schema = pa.schema([
+ ('user_id', pa.int32()),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ ('dt', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(
+ append_schema,
+ partition_keys=['dt'],
+ options={'changelog-producer': 'none', 'bucket': '1'}
+ )
+ self.assertIsNotNone(schema)
+
+ def test_input_mode_changelog_inherits_data_file_format(self):
+ table = self._create_table(
+ 'test_input_changelog_format',
+ options={'changelog-producer': 'input', 'bucket': '1',
'file.format': 'orc'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ table_commit.commit(table_write.prepare_commit())
+
+ bucket_dir = os.path.join(
+ self.warehouse, 'default.db', 'test_input_changelog_format',
'dt=p1', 'bucket-0')
+ changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
+ self.assertTrue(len(changelog_files) > 0, "Should have changelog
files")
+ for f in changelog_files:
+ self.assertTrue(f.endswith('.orc'),
+ f"Changelog file should inherit data file format
(orc), got {f}")
+
+ data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
+ self.assertTrue(len(data_files) > 0, "Should have data files")
+ for f in data_files:
+ self.assertTrue(f.endswith('.orc'),
+ f"Data file should use orc format, got {f}")
+
+ table_write.close()
+ table_commit.close()
+
+ def test_input_mode_changelog_respects_changelog_file_format(self):
+ table = self._create_table(
+ 'test_input_cl_file_fmt',
+ options={'changelog-producer': 'input', 'bucket': '1',
+ 'file.format': 'parquet', 'changelog-file.format': 'orc'}
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(self._sample_data())
+ table_commit.commit(table_write.prepare_commit())
+
+ bucket_dir = os.path.join(
+ self.warehouse, 'default.db', 'test_input_cl_file_fmt', 'dt=p1',
'bucket-0')
+ changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
+ self.assertTrue(len(changelog_files) > 0, "Should have changelog
files")
+ for f in changelog_files:
+ self.assertTrue(f.endswith('.orc'),
+ f"Changelog file should use orc format, got {f}")
+
+ data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
+ self.assertTrue(len(data_files) > 0, "Should have data files")
+ for f in data_files:
+ self.assertTrue(f.endswith('.parquet'),
+ f"Data file should use parquet format, got {f}")
+
+ table_write.close()
+ table_commit.close()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/commit_message.py
b/paimon-python/pypaimon/write/commit_message.py
index 552df0000f..a8cc193727 100644
--- a/paimon-python/pypaimon/write/commit_message.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -31,6 +31,7 @@ class CommitMessage:
new_files: List[DataFileMeta]
check_from_snapshot: Optional[int] = -1
index_deletes: List['IndexManifestEntry'] = field(default_factory=list)
+ changelog_files: List[DataFileMeta] = field(default_factory=list)
def is_empty(self):
- return not self.new_files and not self.index_deletes
+ return not self.new_files and not self.index_deletes and not
self.changelog_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 41440ae862..49f12cfac7 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -140,8 +140,10 @@ class FileStoreCommit:
total_buckets=self.table.total_buckets,
file=file
))
+ changelog_entries = self._collect_changelog_entries(commit_messages)
- logger.info("Finished collecting changes, including: %d entries",
len(commit_entries))
+ logger.info("Finished collecting changes, including: %d entries, %d
changelog entries",
+ len(commit_entries), len(changelog_entries))
index_deletes = []
for msg in commit_messages:
@@ -182,6 +184,7 @@ class FileStoreCommit:
self._try_commit(commit_kind=commit_kind,
commit_identifier=commit_identifier,
commit_entries_plan=lambda snapshot: commit_entries,
+ changelog_entries=changelog_entries,
detect_conflicts=detect_conflicts,
allow_rollback=allow_rollback,
index_deletes=index_deletes)
@@ -206,12 +209,15 @@ class FileStoreCommit:
else:
partition_filter =
self._create_static_partition_filter(overwrite_partition, commit_messages)
+ changelog_entries = self._collect_changelog_entries(commit_messages)
+
if not skip_overwrite:
self._try_commit(
commit_kind="OVERWRITE",
commit_identifier=commit_identifier,
commit_entries_plan=lambda snapshot:
self._generate_overwrite_entries(
snapshot, partition_filter, commit_messages),
+ changelog_entries=changelog_entries,
detect_conflicts=True,
allow_rollback=False,
)
@@ -270,7 +276,7 @@ class FileStoreCommit:
)
def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan,
- detect_conflicts=False, allow_rollback=False,
index_deletes=None):
+ detect_conflicts=False, allow_rollback=False,
index_deletes=None, changelog_entries=None):
retry_count = 0
retry_result = None
@@ -288,6 +294,7 @@ class FileStoreCommit:
retry_result=retry_result,
commit_kind=commit_kind,
commit_entries=commit_entries,
+ changelog_entries=changelog_entries or [],
commit_identifier=commit_identifier,
latest_snapshot=latest_snapshot,
detect_conflicts=detect_conflicts,
@@ -341,7 +348,9 @@ class FileStoreCommit:
retry_count += 1
def _try_commit_once(self, retry_result: Optional[RetryResult],
commit_kind: str,
- commit_entries: List[ManifestEntry],
commit_identifier: int,
+ commit_entries: List[ManifestEntry],
+ changelog_entries: List[ManifestEntry],
+ commit_identifier: int,
latest_snapshot: Optional[Snapshot],
detect_conflicts: bool = False,
allow_rollback: bool = False,
@@ -381,10 +390,28 @@ class FileStoreCommit:
first_row_id_start = self._get_next_row_id_start(latest_snapshot)
commit_entries, next_row_id =
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
+ changelog_manifest_list_name = None
+ changelog_manifest_list_size = None
+ changelog_record_count = None
try:
new_manifest_file_meta = self._write_manifest_file(commit_entries,
new_manifest_file)
self.manifest_list_manager.write(delta_manifest_list,
[new_manifest_file_meta])
+ # Write changelog manifest if changelog entries exist
+ if changelog_entries:
+ changelog_manifest_file =
f"manifest-{str(uuid.uuid4())}-changelog-0"
+ changelog_manifest_file_meta = self._write_manifest_file(
+ changelog_entries, changelog_manifest_file)
+ changelog_manifest_list_name =
f"manifest-list-{unique_id}-changelog"
+ self.manifest_list_manager.write(
+ changelog_manifest_list_name,
[changelog_manifest_file_meta])
+ manifest_path = self.manifest_list_manager.manifest_path
+ changelog_manifest_list_size =
self.table.file_io.get_file_size(
+ f"{manifest_path}/{changelog_manifest_list_name}")
+ # kind==0 means ADD; pypaimon producers only support additions
currently
+ changelog_record_count = sum(
+ entry.file.row_count for entry in changelog_entries if
entry.kind == 0)
+
# process existing_manifest
total_record_count = 0
if latest_snapshot:
@@ -421,6 +448,9 @@ class FileStoreCommit:
schema_id=self.table.table_schema.id,
base_manifest_list=base_manifest_list,
delta_manifest_list=delta_manifest_list,
+ changelog_manifest_list=changelog_manifest_list_name,
+ changelog_manifest_list_size=changelog_manifest_list_size,
+ changelog_record_count=changelog_record_count,
total_record_count=total_record_count,
delta_record_count=delta_record_count,
commit_user=self.commit_user,
@@ -434,7 +464,7 @@ class FileStoreCommit:
statistics = self._generate_partition_statistics(commit_entries)
except Exception as e:
self._cleanup_preparation_failure(delta_manifest_list,
base_manifest_list,
- new_index_manifest)
+ new_index_manifest,
changelog_manifest_list_name)
logger.warning(f"Exception occurs when preparing snapshot: {e}",
exc_info=True)
raise RuntimeError(f"Failed to prepare snapshot: {e}")
@@ -455,7 +485,7 @@ class FileStoreCommit:
commit_time_s,
)
self._cleanup_preparation_failure(delta_manifest_list,
base_manifest_list,
- new_index_manifest)
+ new_index_manifest,
changelog_manifest_list_name)
return RetryResult(latest_snapshot, None)
except Exception as e:
# Commit exception, not sure about the situation and should not
clean up the files
@@ -634,10 +664,25 @@ class FileStoreCommit:
time.sleep(total_wait_ms / 1000.0)
+ def _collect_changelog_entries(self, commit_messages: List[CommitMessage])
-> List[ManifestEntry]:
+ changelog_entries = []
+ for msg in commit_messages:
+ partition = GenericRow(list(msg.partition),
self.table.partition_keys_fields)
+ for file in msg.changelog_files:
+ changelog_entries.append(ManifestEntry(
+ kind=0,
+ partition=partition,
+ bucket=msg.bucket,
+ total_buckets=self.table.total_buckets,
+ file=file
+ ))
+ return changelog_entries
+
def _cleanup_preparation_failure(self,
delta_manifest_list: Optional[str],
base_manifest_list: Optional[str],
- index_manifest: Optional[str] = None):
+ index_manifest: Optional[str] = None,
+ changelog_manifest_list: Optional[str] =
None):
try:
manifest_path = self.manifest_list_manager.manifest_path
@@ -655,21 +700,31 @@ class FileStoreCommit:
if base_manifest_list:
base_path = f"{manifest_path}/{base_manifest_list}"
self.table.file_io.delete_quietly(base_path)
+
+ if changelog_manifest_list:
+ try:
+ changelog_manifests =
self.manifest_list_manager.read(changelog_manifest_list)
+ for manifest_meta in changelog_manifests:
+ manifest_file_path = (
+
f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}")
+ self.table.file_io.delete_quietly(manifest_file_path)
+ except Exception:
+ pass
+ changelog_path = f"{manifest_path}/{changelog_manifest_list}"
+ self.table.file_io.delete_quietly(changelog_path)
except Exception as e:
logger.warning(f"Failed to clean up temporary files during
preparation failure: {e}", exc_info=True)
def abort(self, commit_messages: List[CommitMessage]):
"""Abort commit and delete files. Uses external_path if available to
ensure proper scheme handling."""
for message in commit_messages:
- for file in message.new_files:
+ for file in list(message.new_files) +
list(message.changelog_files):
try:
path_to_delete = file.external_path if file.external_path
else file.file_path
if path_to_delete:
path_str = str(path_to_delete)
self.table.file_io.delete_quietly(path_str)
except Exception as e:
- import logging
- logger = logging.getLogger(__name__)
path_to_delete = file.external_path if file.external_path
else file.file_path
logger.warning(f"Failed to clean up file {path_to_delete}
during abort: {e}")
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index c77f88e907..ee3b96d8d7 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -47,6 +47,7 @@ class FileStoreWrite:
self.blob_consumer = None
self.commit_identifier = 0
self.options = CoreOptions.copy(table.options)
+ self.changelog_producer = self.options.changelog_producer()
if self.table.bucket_mode() == BucketMode.POSTPONE_MODE:
self.options.set(CoreOptions.DATA_FILE_PREFIX,
(f"{self.options.data_file_prefix()}-u-{commit_user}"
@@ -78,6 +79,7 @@ class FileStoreWrite:
options=options,
write_cols=self.write_cols,
blob_consumer=self.blob_consumer,
+ changelog_producer=self.changelog_producer,
)
elif self._has_vector_columns() and options.with_vector_format():
return DataVectorWriter(
@@ -95,7 +97,8 @@ class FileStoreWrite:
bucket=bucket,
max_seq_number=max_seq_number(),
options=options,
- merge_function=self._build_pk_merge_function())
+ merge_function=self._build_pk_merge_function(),
+ changelog_producer=self.changelog_producer)
else:
seq_number = 0 if self.table.bucket_mode() ==
BucketMode.BUCKET_UNAWARE else max_seq_number()
return AppendOnlyDataWriter(
@@ -104,7 +107,8 @@ class FileStoreWrite:
bucket=bucket,
max_seq_number=seq_number,
options=options,
- write_cols=self.write_cols
+ write_cols=self.write_cols,
+ changelog_producer=self.changelog_producer
)
def _build_pk_merge_function(self):
@@ -230,11 +234,13 @@ class FileStoreWrite:
commit_messages = []
for (partition, bucket), writer in self.data_writers.items():
committed_files = writer.prepare_commit()
- if committed_files:
+ changelog_files = writer.prepare_changelog_commit()
+ if committed_files or changelog_files:
commit_message = CommitMessage(
partition=partition,
bucket=bucket,
- new_files=committed_files
+ new_files=committed_files,
+ changelog_files=changelog_files,
)
commit_messages.append(commit_message)
return commit_messages
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 313caa7f6d..9f10e8a64e 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -21,7 +21,7 @@ import uuid
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple
-from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.core_options import CoreOptions, ChangelogProducer
from pypaimon.common.external_path_provider import ExternalPathProvider
from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -35,7 +35,8 @@ class DataWriter(ABC):
"""Base class for data writers that handle PyArrow tables directly."""
def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, options: CoreOptions = None,
- write_cols: Optional[List[str]] = None):
+ write_cols: Optional[List[str]] = None,
+ changelog_producer: ChangelogProducer =
ChangelogProducer.NONE):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
@@ -61,6 +62,12 @@ class DataWriter(ABC):
self.pending_data: Optional[pa.Table] = None
self.committed_files: List[DataFileMeta] = []
+ self.committed_changelog_files: List[DataFileMeta] = []
+ self.changelog_producer = changelog_producer
+ self.changelog_file_format = (
+ self.options.changelog_file_format()
+ or self.file_format
+ )
self.write_cols = write_cols
self.blob_as_descriptor = self.options.blob_as_descriptor()
@@ -68,8 +75,6 @@ class DataWriter(ABC):
self.external_path_provider: Optional[ExternalPathProvider] =
self.path_factory.create_external_path_provider(
self.partition, self.bucket
)
- # Store the current generated external path to preserve scheme in
metadata
- self._current_external_path: Optional[str] = None
# Variant shredding (static mode) — col_name → (obj_fields,
target_arrow_type)
self._variant_shredding: Dict[str, Tuple] = {}
if self.file_format == CoreOptions.FILE_FORMAT_PARQUET \
@@ -111,6 +116,9 @@ class DataWriter(ABC):
return self.committed_files.copy()
+ def prepare_changelog_commit(self) -> List[DataFileMeta]:
+ return self.committed_changelog_files.copy()
+
def close(self):
try:
if self.pending_data is not None and self.pending_data.num_rows >
0:
@@ -130,16 +138,14 @@ class DataWriter(ABC):
Abort all writers and clean up resources. This method should be called
when an error occurs
during writing. It deletes any files that were written and cleans up
resources.
"""
- # Delete any files that were written
- for file_meta in self.committed_files:
+ # Delete any files that were written (data + changelog)
+ for file_meta in self.committed_files + self.committed_changelog_files:
try:
- # Use external_path if available (contains full URL scheme),
otherwise use file_path
path_to_delete = file_meta.external_path if
file_meta.external_path else file_meta.file_path
if path_to_delete:
path_str = str(path_to_delete)
self.file_io.delete_quietly(path_str)
except Exception as e:
- # Log but don't raise - we want to clean up as much as possible
import logging
logger = logging.getLogger(__name__)
path_to_delete = file_meta.external_path if
file_meta.external_path else file_meta.file_path
@@ -148,6 +154,7 @@ class DataWriter(ABC):
# Clean up resources
self.pending_data = None
self.committed_files.clear()
+ self.committed_changelog_files.clear()
@abstractmethod
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
@@ -177,11 +184,7 @@ class DataWriter(ABC):
file_path = self._generate_file_path(file_name)
is_external_path = self.external_path_provider is not None
- if is_external_path:
- # Use the stored external path from _generate_file_path to
preserve scheme
- external_path_str = self._current_external_path if
self._current_external_path else None
- else:
- external_path_str = None
+ external_path_str = file_path if is_external_path else None
if self._variant_shredding:
data = self._apply_variant_shredding(data)
@@ -236,6 +239,7 @@ class DataWriter(ABC):
min_seq = self.sequence_generator.start
max_seq = self.sequence_generator.current
self.sequence_generator.start = self.sequence_generator.current
+ creation_time = Timestamp.now()
self.committed_files.append(DataFileMeta.create(
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
@@ -249,17 +253,23 @@ class DataWriter(ABC):
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
- creation_time=Timestamp.now(),
+ creation_time=creation_time,
delete_row_count=0,
file_source=0,
value_stats_cols=None if value_stats_enabled else [],
- external_path=external_path_str, # Set external path if using
external paths
+ external_path=external_path_str,
first_row_id=None,
write_cols=self.write_cols,
- # None means all columns in the table have been written
file_path=file_path,
))
+ if self.changelog_producer == ChangelogProducer.INPUT:
+ self._write_changelog_file(
+ data, min_key, max_key, key_stats, value_stats,
+ min_seq, max_seq, creation_time,
+ value_stats_enabled, external_path_str is not None,
+ )
+
def _apply_variant_shredding(self, data: pa.Table) -> pa.Table:
"""Transform VARIANT columns into shredded Parquet format.
@@ -285,11 +295,54 @@ class DataWriter(ABC):
return data
return pa.Table.from_arrays(columns, schema=pa.schema(fields))
+ def _write_changelog_file(self, data, min_key, max_key, key_stats,
value_stats,
+ min_seq, max_seq, creation_time,
+ value_stats_enabled, is_external):
+ cl_fmt = self.changelog_file_format
+ changelog_file_name = f"changelog-{uuid.uuid4()}-0.{cl_fmt}"
+ changelog_file_path = self._generate_file_path(changelog_file_name)
+
+ changelog_external_path = changelog_file_path if is_external else None
+
+ if cl_fmt == CoreOptions.FILE_FORMAT_PARQUET:
+ self.file_io.write_parquet(changelog_file_path, data,
compression=self.compression,
+ zstd_level=self.zstd_level)
+ elif cl_fmt == CoreOptions.FILE_FORMAT_ORC:
+ self.file_io.write_orc(changelog_file_path, data,
compression=self.compression,
+ zstd_level=self.zstd_level)
+ elif cl_fmt == CoreOptions.FILE_FORMAT_AVRO:
+ self.file_io.write_avro(changelog_file_path, data,
compression=self.compression,
+ zstd_level=self.zstd_level)
+ else:
+ raise ValueError(f"Unsupported changelog file format: {cl_fmt}. "
+ f"Supported formats: parquet, orc, avro.")
+
+ self.committed_changelog_files.append(DataFileMeta.create(
+ file_name=changelog_file_name,
+ file_size=self.file_io.get_file_size(changelog_file_path),
+ row_count=data.num_rows,
+ min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
+ max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
+ key_stats=key_stats,
+ value_stats=value_stats,
+ min_sequence_number=min_seq,
+ max_sequence_number=max_seq,
+ schema_id=self.table.table_schema.id,
+ level=0,
+ extra_files=[],
+ creation_time=creation_time,
+ delete_row_count=0,
+ file_source=0,
+ value_stats_cols=None if value_stats_enabled else [],
+ external_path=changelog_external_path,
+ first_row_id=None,
+ write_cols=self.write_cols,
+ file_path=changelog_file_path,
+ ))
+
def _generate_file_path(self, file_name: str) -> str:
if self.external_path_provider:
- external_path =
self.external_path_provider.get_next_external_data_path(file_name)
- self._current_external_path = external_path
- return external_path
+ return
self.external_path_provider.get_next_external_data_path(file_name)
bucket_path = self.path_factory.bucket_path(self.partition,
self.bucket)
return f"{bucket_path.rstrip('/')}/{file_name}"
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index 01216b36cd..444e88332b 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -21,7 +21,7 @@ from typing import Dict, List, Optional, Tuple
import pyarrow as pa
-from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.core_options import CoreOptions, ChangelogProducer
from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
@@ -51,8 +51,10 @@ class DedicatedFormatWriter(DataWriter):
CHECK_ROLLING_RECORD_CNT = 1000
def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, options: CoreOptions = None,
- write_cols: Optional[List[str]] = None, blob_consumer:
Optional[BlobConsumer] = None):
- super().__init__(table, partition, bucket, max_seq_number, options,
write_cols=write_cols)
+ write_cols: Optional[List[str]] = None, blob_consumer:
Optional[BlobConsumer] = None,
+ changelog_producer: ChangelogProducer =
ChangelogProducer.NONE):
+ super().__init__(table, partition, bucket, max_seq_number, options,
write_cols=write_cols,
+ changelog_producer=changelog_producer)
# Determine blob columns from table schema
self.blob_column_names = self._get_blob_columns_from_schema()
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 64f6003a06..5200dd2078 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -23,6 +23,7 @@ import pyarrow.compute as pc
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.reader.deduplicate_merge_function import \
DeduplicateMergeFunction
+from pypaimon.common.options.core_options import ChangelogProducer
from pypaimon.table.row.key_value import KeyValue
from pypaimon.write.writer.data_writer import DataWriter
@@ -40,9 +41,10 @@ class KeyValueDataWriter(DataWriter):
"""
def __init__(self, table, partition, bucket, max_seq_number,
- options=None, write_cols=None, merge_function=None):
+ options=None, write_cols=None, merge_function=None,
+ changelog_producer=ChangelogProducer.NONE):
super().__init__(table, partition, bucket, max_seq_number,
- options, write_cols)
+ options, write_cols, changelog_producer)
# Defaults to deduplicate so direct callers (tests / future code
# paths that don't go through FileStoreWrite) don't accidentally
# skip the merge step entirely.