This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new df4b475ec6 [python] Generate input changelogs from Python writer 
(#7739)
df4b475ec6 is described below

commit df4b475ec6cbc7ede22b61c84edc79103c56c584
Author: junmuz <[email protected]>
AuthorDate: Sat Jun 6 09:36:29 2026 +0100

    [python] Generate input changelogs from Python writer (#7739)
---
 .../pypaimon/common/options/core_options.py        |  11 +
 paimon-python/pypaimon/schema/schema.py            |   7 +
 .../pypaimon/tests/file_store_commit_test.py       |   1 +
 .../pypaimon/tests/reader_append_only_test.py      |   1 +
 .../tests/write/changelog_producer_test.py         | 375 +++++++++++++++++++++
 paimon-python/pypaimon/write/commit_message.py     |   3 +-
 paimon-python/pypaimon/write/file_store_commit.py  |  73 +++-
 paimon-python/pypaimon/write/file_store_write.py   |  14 +-
 paimon-python/pypaimon/write/writer/data_writer.py |  91 +++--
 .../write/writer/dedicated_format_writer.py        |   8 +-
 .../pypaimon/write/writer/key_value_data_writer.py |   6 +-
 11 files changed, 552 insertions(+), 38 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 874b888faf..e84ea6c4e1 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -470,6 +470,14 @@ class CoreOptions:
                           "Options: none, input, full-compaction, lookup.")
     )
 
+    CHANGELOG_FILE_FORMAT: ConfigOption[str] = (
+        ConfigOptions.key("changelog-file.format")
+        .string_type()
+        .no_default_value()
+        .with_description("Specify the file format of changelog files. "
+                          "Currently parquet, avro and orc are supported.")
+    )
+
     MERGE_ENGINE: ConfigOption[MergeEngine] = (
         ConfigOptions.key("merge-engine")
         .enum_type(MergeEngine)
@@ -959,6 +967,9 @@ class CoreOptions:
     def changelog_producer(self, default=None):
         return self.options.get(CoreOptions.CHANGELOG_PRODUCER, default)
 
+    def changelog_file_format(self, default=None):
+        return self.options.get(CoreOptions.CHANGELOG_FILE_FORMAT, default)
+
     def merge_engine(self, default=None):
         return self.options.get(CoreOptions.MERGE_ENGINE, default)
 
diff --git a/paimon-python/pypaimon/schema/schema.py 
b/paimon-python/pypaimon/schema/schema.py
index f3a63c88e1..3354afebc8 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -48,6 +48,13 @@ class Schema:
         self.options = options if options is not None else {}
         self.comment = comment
 
+        changelog_producer = 
self.options.get(CoreOptions.CHANGELOG_PRODUCER.key(), 'none')
+        if changelog_producer != 'none' and not self.primary_keys:
+            raise ValueError(
+                f"Cannot set 'changelog-producer' to '{changelog_producer}' on 
a table without primary keys. "
+                f"Changelog producer requires primary keys to be defined."
+            )
+
     @staticmethod
     def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: 
Optional[List[str]] = None,
                             primary_keys: Optional[List[str]] = None, options: 
Optional[Dict] = None,
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py 
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index ec13d4041a..8b6b892bae 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -435,6 +435,7 @@ class TestFileStoreCommit(unittest.TestCase):
             retry_result=None,
             commit_kind="APPEND",
             commit_entries=[commit_entry],
+            changelog_entries=[],
             commit_identifier=11,
             latest_snapshot=latest_snapshot
         )
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 68d268d655..34509db74a 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -635,6 +635,7 @@ class AoReaderTest(unittest.TestCase):
             RetryResult(None),
             "APPEND",
             commit_entries,
+            [],
             BATCH_COMMIT_IDENTIFIER,
             latest_snapshot)
         self.assertTrue(success.is_success())
diff --git a/paimon-python/pypaimon/tests/write/changelog_producer_test.py 
b/paimon-python/pypaimon/tests/write/changelog_producer_test.py
new file mode 100644
index 0000000000..b789593038
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/changelog_producer_test.py
@@ -0,0 +1,375 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import glob
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+import json
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.schema.data_types import DataField, AtomicType
+from pypaimon.write.commit_message import CommitMessage
+
+
+class ChangelogProducerTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+        cls.pk_schema = pa.schema([
+            pa.field('user_id', pa.int32(), nullable=False),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            pa.field('dt', pa.string(), nullable=False)
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_table(self, table_name, options=None):
+        schema = Schema.from_pyarrow_schema(
+            self.pk_schema,
+            partition_keys=['dt'],
+            primary_keys=['user_id', 'dt'],
+            options=options or {}
+        )
+        self.catalog.create_table(f'default.{table_name}', schema, False)
+        return self.catalog.get_table(f'default.{table_name}')
+
+    def _sample_data(self):
+        return pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [101, 102, 103],
+            'behavior': ['click', 'buy', 'view'],
+            'dt': ['p1', 'p1', 'p1']
+        }, schema=self.pk_schema)
+
+    def test_commit_message_with_changelog(self):
+        msg = CommitMessage(partition=('p1',), bucket=0, new_files=[], 
changelog_files=[])
+        self.assertTrue(msg.is_empty())
+
+        msg2 = CommitMessage(partition=('p1',), bucket=0, new_files=['fake'])
+        self.assertFalse(msg2.is_empty())
+        self.assertEqual(msg2.changelog_files, [])
+
+    def test_full_compaction_and_lookup_no_changelog_from_writer(self):
+        """FULL_COMPACTION and LOOKUP rely on dedicated compaction for 
changelog,
+        so the Python writer should not produce changelog files for these 
modes."""
+        for mode in ['full-compaction', 'lookup']:
+            table = self._create_table(
+                f'test_no_changelog_{mode.replace("-", "_")}',
+                options={'changelog-producer': mode, 'bucket': '1'}
+            )
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+
+            table_write.write_arrow(self._sample_data())
+            table_commit.commit(table_write.prepare_commit())
+
+            bucket_dir = os.path.join(
+                self.warehouse, 'default.db',
+                f'test_no_changelog_{mode.replace("-", "_")}', 'dt=p1', 
'bucket-0')
+            changelog_files = glob.glob(os.path.join(bucket_dir, 
'changelog-*'))
+            self.assertEqual(len(changelog_files), 0,
+                             f"Writer should not produce changelog files for 
{mode}")
+
+            table_write.close()
+            table_commit.close()
+
+    def test_none_mode_no_changelog(self):
+        table = self._create_table(
+            'test_none_mode',
+            options={'changelog-producer': 'none', 'bucket': '1'}
+        )
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        table_write.write_arrow(self._sample_data())
+        table_commit.commit(table_write.prepare_commit())
+
+        snapshot_path = glob.glob(
+            os.path.join(self.warehouse, 'default.db', 'test_none_mode', 
'snapshot', 'snapshot-*'))
+        self.assertTrue(len(snapshot_path) > 0)
+
+        snapshot_json = open(snapshot_path[0]).read()
+        snapshot = json.loads(snapshot_json)
+        self.assertNotIn('changelogManifestList', snapshot)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_input_mode_produces_changelog_files(self):
+        table = self._create_table(
+            'test_input_files',
+            options={'changelog-producer': 'input', 'bucket': '1'}
+        )
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        table_write.write_arrow(self._sample_data())
+        table_commit.commit(table_write.prepare_commit())
+
+        bucket_dir = os.path.join(
+            self.warehouse, 'default.db', 'test_input_files', 'dt=p1', 
'bucket-0')
+        data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
+        changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
+        self.assertTrue(len(data_files) > 0, "Should have data files")
+        self.assertTrue(len(changelog_files) > 0, "Should have changelog 
files")
+
+        table_write.close()
+        table_commit.close()
+
+    def test_input_mode_snapshot_has_changelog_manifest(self):
+        table = self._create_table(
+            'test_input_snapshot',
+            options={'changelog-producer': 'input', 'bucket': '1'}
+        )
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        table_write.write_arrow(self._sample_data())
+        table_commit.commit(table_write.prepare_commit())
+
+        snapshot_path = glob.glob(
+            os.path.join(self.warehouse, 'default.db', 'test_input_snapshot',
+                         'snapshot', 'snapshot-*'))
+        self.assertTrue(len(snapshot_path) > 0)
+
+        snapshot_json = open(snapshot_path[0]).read()
+        snapshot = json.loads(snapshot_json)
+        self.assertIn('changelogManifestList', snapshot)
+        self.assertIsNotNone(snapshot['changelogManifestList'])
+        self.assertIn('changelogRecordCount', snapshot)
+        self.assertEqual(snapshot['changelogRecordCount'], 3)
+        self.assertIn('changelogManifestListSize', snapshot)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_input_mode_changelog_manifest_readable(self):
+        table = self._create_table(
+            'test_input_readable',
+            options={'changelog-producer': 'input', 'bucket': '1'}
+        )
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        table_write.write_arrow(self._sample_data())
+        table_commit.commit(table_write.prepare_commit())
+
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+        snapshot_manager = SnapshotManager(table.file_io, table.table_path)
+        snapshot = snapshot_manager.get_latest_snapshot()
+
+        self.assertIsNotNone(snapshot.changelog_manifest_list)
+
+        manifest_list_manager = ManifestListManager(table)
+        changelog_manifests = manifest_list_manager.read_changelog(snapshot)
+        self.assertTrue(len(changelog_manifests) > 0)
+
+        total_changelog_added = sum(m.num_added_files for m in 
changelog_manifests)
+        self.assertTrue(total_changelog_added > 0)
+
+        table_write.close()
+        table_commit.close()
+
+    def test_input_mode_multiple_commits(self):
+        table = self._create_table(
+            'test_input_multi',
+            options={'changelog-producer': 'input', 'bucket': '1'}
+        )
+
+        # First commit
+        write_builder1 = table.new_batch_write_builder()
+        table_write1 = write_builder1.new_write()
+        table_commit1 = write_builder1.new_commit()
+        table_write1.write_arrow(self._sample_data())
+        table_commit1.commit(table_write1.prepare_commit())
+        table_write1.close()
+        table_commit1.close()
+
+        # Second commit with different data
+        write_builder2 = table.new_batch_write_builder()
+        table_write2 = write_builder2.new_write()
+        table_commit2 = write_builder2.new_commit()
+        data2 = pa.Table.from_pydict({
+            'user_id': [4, 5],
+            'item_id': [104, 105],
+            'behavior': ['click', 'buy'],
+            'dt': ['p1', 'p1']
+        }, schema=self.pk_schema)
+        table_write2.write_arrow(data2)
+        table_commit2.commit(table_write2.prepare_commit())
+        table_write2.close()
+        table_commit2.close()
+
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+        snapshot_manager = SnapshotManager(table.file_io, table.table_path)
+        snapshot = snapshot_manager.get_latest_snapshot()
+        self.assertEqual(snapshot.id, 2)
+        self.assertIsNotNone(snapshot.changelog_manifest_list)
+        self.assertEqual(snapshot.changelog_record_count, 2)
+
+    def test_abort_cleans_up_changelog_files(self):
+        table = self._create_table(
+            'test_input_abort',
+            options={'changelog-producer': 'input', 'bucket': '1'}
+        )
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        table_write.write_arrow(self._sample_data())
+        commit_messages = table_write.prepare_commit()
+
+        bucket_dir = os.path.join(
+            self.warehouse, 'default.db', 'test_input_abort', 'dt=p1', 
'bucket-0')
+        changelog_files_before = glob.glob(os.path.join(bucket_dir, 
'changelog-*'))
+        self.assertTrue(len(changelog_files_before) > 0)
+
+        table_commit.abort(commit_messages)
+
+        data_files_after = glob.glob(os.path.join(bucket_dir, 'data-*'))
+        changelog_files_after = glob.glob(os.path.join(bucket_dir, 
'changelog-*'))
+        self.assertEqual(len(data_files_after), 0, "Data files should be 
cleaned up after abort")
+        self.assertEqual(len(changelog_files_after), 0, "Changelog files 
should be cleaned up after abort")
+
+        table_write.close()
+        table_commit.close()
+
+    def test_reject_changelog_producer_on_append_only_table(self):
+        append_schema = pa.schema([
+            ('user_id', pa.int32()),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            ('dt', pa.string())
+        ])
+        for mode in ['input', 'full-compaction', 'lookup']:
+            with self.assertRaises(ValueError, msg=f"Should reject 
changelog-producer={mode} without PKs"):
+                Schema.from_pyarrow_schema(
+                    append_schema,
+                    partition_keys=['dt'],
+                    options={'changelog-producer': mode, 'bucket': '1'}
+                )
+
+    def test_reject_changelog_producer_on_direct_schema_construction(self):
+        for mode in ['input', 'full-compaction', 'lookup']:
+            with self.assertRaises(ValueError, msg=f"Should reject 
changelog-producer={mode} without PKs"):
+                Schema(
+                    fields=[
+                        DataField(0, 'user_id', AtomicType('INT')),
+                        DataField(1, 'item_id', AtomicType('BIGINT')),
+                        DataField(2, 'behavior', AtomicType('STRING')),
+                        DataField(3, 'dt', AtomicType('STRING')),
+                    ],
+                    partition_keys=['dt'],
+                    primary_keys=[],
+                    options={'changelog-producer': mode, 'bucket': '1'}
+                )
+
+    def test_changelog_producer_none_allowed_on_append_only_table(self):
+        append_schema = pa.schema([
+            ('user_id', pa.int32()),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            ('dt', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(
+            append_schema,
+            partition_keys=['dt'],
+            options={'changelog-producer': 'none', 'bucket': '1'}
+        )
+        self.assertIsNotNone(schema)
+
+    def test_input_mode_changelog_inherits_data_file_format(self):
+        table = self._create_table(
+            'test_input_changelog_format',
+            options={'changelog-producer': 'input', 'bucket': '1', 
'file.format': 'orc'}
+        )
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        table_write.write_arrow(self._sample_data())
+        table_commit.commit(table_write.prepare_commit())
+
+        bucket_dir = os.path.join(
+            self.warehouse, 'default.db', 'test_input_changelog_format', 
'dt=p1', 'bucket-0')
+        changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
+        self.assertTrue(len(changelog_files) > 0, "Should have changelog 
files")
+        for f in changelog_files:
+            self.assertTrue(f.endswith('.orc'),
+                            f"Changelog file should inherit data file format 
(orc), got {f}")
+
+        data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
+        self.assertTrue(len(data_files) > 0, "Should have data files")
+        for f in data_files:
+            self.assertTrue(f.endswith('.orc'),
+                            f"Data file should use orc format, got {f}")
+
+        table_write.close()
+        table_commit.close()
+
+    def test_input_mode_changelog_respects_changelog_file_format(self):
+        table = self._create_table(
+            'test_input_cl_file_fmt',
+            options={'changelog-producer': 'input', 'bucket': '1',
+                     'file.format': 'parquet', 'changelog-file.format': 'orc'}
+        )
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        table_write.write_arrow(self._sample_data())
+        table_commit.commit(table_write.prepare_commit())
+
+        bucket_dir = os.path.join(
+            self.warehouse, 'default.db', 'test_input_cl_file_fmt', 'dt=p1', 
'bucket-0')
+        changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
+        self.assertTrue(len(changelog_files) > 0, "Should have changelog 
files")
+        for f in changelog_files:
+            self.assertTrue(f.endswith('.orc'),
+                            f"Changelog file should use orc format, got {f}")
+
+        data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
+        self.assertTrue(len(data_files) > 0, "Should have data files")
+        for f in data_files:
+            self.assertTrue(f.endswith('.parquet'),
+                            f"Data file should use parquet format, got {f}")
+
+        table_write.close()
+        table_commit.close()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/commit_message.py 
b/paimon-python/pypaimon/write/commit_message.py
index 552df0000f..a8cc193727 100644
--- a/paimon-python/pypaimon/write/commit_message.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -31,6 +31,7 @@ class CommitMessage:
     new_files: List[DataFileMeta]
     check_from_snapshot: Optional[int] = -1
     index_deletes: List['IndexManifestEntry'] = field(default_factory=list)
+    changelog_files: List[DataFileMeta] = field(default_factory=list)
 
     def is_empty(self):
-        return not self.new_files and not self.index_deletes
+        return not self.new_files and not self.index_deletes and not 
self.changelog_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 41440ae862..49f12cfac7 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -140,8 +140,10 @@ class FileStoreCommit:
                     total_buckets=self.table.total_buckets,
                     file=file
                 ))
+        changelog_entries = self._collect_changelog_entries(commit_messages)
 
-        logger.info("Finished collecting changes, including: %d entries", 
len(commit_entries))
+        logger.info("Finished collecting changes, including: %d entries, %d 
changelog entries",
+                    len(commit_entries), len(changelog_entries))
 
         index_deletes = []
         for msg in commit_messages:
@@ -182,6 +184,7 @@ class FileStoreCommit:
         self._try_commit(commit_kind=commit_kind,
                          commit_identifier=commit_identifier,
                          commit_entries_plan=lambda snapshot: commit_entries,
+                         changelog_entries=changelog_entries,
                          detect_conflicts=detect_conflicts,
                          allow_rollback=allow_rollback,
                          index_deletes=index_deletes)
@@ -206,12 +209,15 @@ class FileStoreCommit:
         else:
             partition_filter = 
self._create_static_partition_filter(overwrite_partition, commit_messages)
 
+        changelog_entries = self._collect_changelog_entries(commit_messages)
+
         if not skip_overwrite:
             self._try_commit(
                 commit_kind="OVERWRITE",
                 commit_identifier=commit_identifier,
                 commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(
                     snapshot, partition_filter, commit_messages),
+                changelog_entries=changelog_entries,
                 detect_conflicts=True,
                 allow_rollback=False,
             )
@@ -270,7 +276,7 @@ class FileStoreCommit:
         )
 
     def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan,
-                    detect_conflicts=False, allow_rollback=False, 
index_deletes=None):
+                    detect_conflicts=False, allow_rollback=False, 
index_deletes=None, changelog_entries=None):
 
         retry_count = 0
         retry_result = None
@@ -288,6 +294,7 @@ class FileStoreCommit:
                 retry_result=retry_result,
                 commit_kind=commit_kind,
                 commit_entries=commit_entries,
+                changelog_entries=changelog_entries or [],
                 commit_identifier=commit_identifier,
                 latest_snapshot=latest_snapshot,
                 detect_conflicts=detect_conflicts,
@@ -341,7 +348,9 @@ class FileStoreCommit:
             retry_count += 1
 
     def _try_commit_once(self, retry_result: Optional[RetryResult], 
commit_kind: str,
-                         commit_entries: List[ManifestEntry], 
commit_identifier: int,
+                         commit_entries: List[ManifestEntry],
+                         changelog_entries: List[ManifestEntry],
+                         commit_identifier: int,
                          latest_snapshot: Optional[Snapshot],
                          detect_conflicts: bool = False,
                          allow_rollback: bool = False,
@@ -381,10 +390,28 @@ class FileStoreCommit:
             first_row_id_start = self._get_next_row_id_start(latest_snapshot)
             commit_entries, next_row_id = 
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
 
+        changelog_manifest_list_name = None
+        changelog_manifest_list_size = None
+        changelog_record_count = None
         try:
             new_manifest_file_meta = self._write_manifest_file(commit_entries, 
new_manifest_file)
             self.manifest_list_manager.write(delta_manifest_list, 
[new_manifest_file_meta])
 
+            # Write changelog manifest if changelog entries exist
+            if changelog_entries:
+                changelog_manifest_file = 
f"manifest-{str(uuid.uuid4())}-changelog-0"
+                changelog_manifest_file_meta = self._write_manifest_file(
+                    changelog_entries, changelog_manifest_file)
+                changelog_manifest_list_name = 
f"manifest-list-{unique_id}-changelog"
+                self.manifest_list_manager.write(
+                    changelog_manifest_list_name, 
[changelog_manifest_file_meta])
+                manifest_path = self.manifest_list_manager.manifest_path
+                changelog_manifest_list_size = 
self.table.file_io.get_file_size(
+                    f"{manifest_path}/{changelog_manifest_list_name}")
+                # kind==0 means ADD; pypaimon producers only support additions 
currently
+                changelog_record_count = sum(
+                    entry.file.row_count for entry in changelog_entries if 
entry.kind == 0)
+
             # process existing_manifest
             total_record_count = 0
             if latest_snapshot:
@@ -421,6 +448,9 @@ class FileStoreCommit:
                 schema_id=self.table.table_schema.id,
                 base_manifest_list=base_manifest_list,
                 delta_manifest_list=delta_manifest_list,
+                changelog_manifest_list=changelog_manifest_list_name,
+                changelog_manifest_list_size=changelog_manifest_list_size,
+                changelog_record_count=changelog_record_count,
                 total_record_count=total_record_count,
                 delta_record_count=delta_record_count,
                 commit_user=self.commit_user,
@@ -434,7 +464,7 @@ class FileStoreCommit:
             statistics = self._generate_partition_statistics(commit_entries)
         except Exception as e:
             self._cleanup_preparation_failure(delta_manifest_list, 
base_manifest_list,
-                                              new_index_manifest)
+                                              new_index_manifest, 
changelog_manifest_list_name)
             logger.warning(f"Exception occurs when preparing snapshot: {e}", 
exc_info=True)
             raise RuntimeError(f"Failed to prepare snapshot: {e}")
 
@@ -455,7 +485,7 @@ class FileStoreCommit:
                         commit_time_s,
                     )
                     self._cleanup_preparation_failure(delta_manifest_list, 
base_manifest_list,
-                                                      new_index_manifest)
+                                                      new_index_manifest, 
changelog_manifest_list_name)
                     return RetryResult(latest_snapshot, None)
         except Exception as e:
             # Commit exception, not sure about the situation and should not 
clean up the files
@@ -634,10 +664,25 @@ class FileStoreCommit:
 
         time.sleep(total_wait_ms / 1000.0)
 
+    def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) 
-> List[ManifestEntry]:
+        changelog_entries = []
+        for msg in commit_messages:
+            partition = GenericRow(list(msg.partition), 
self.table.partition_keys_fields)
+            for file in msg.changelog_files:
+                changelog_entries.append(ManifestEntry(
+                    kind=0,
+                    partition=partition,
+                    bucket=msg.bucket,
+                    total_buckets=self.table.total_buckets,
+                    file=file
+                ))
+        return changelog_entries
+
     def _cleanup_preparation_failure(self,
                                      delta_manifest_list: Optional[str],
                                      base_manifest_list: Optional[str],
-                                     index_manifest: Optional[str] = None):
+                                     index_manifest: Optional[str] = None,
+                                     changelog_manifest_list: Optional[str] = 
None):
         try:
             manifest_path = self.manifest_list_manager.manifest_path
 
@@ -655,21 +700,31 @@ class FileStoreCommit:
             if base_manifest_list:
                 base_path = f"{manifest_path}/{base_manifest_list}"
                 self.table.file_io.delete_quietly(base_path)
+
+            if changelog_manifest_list:
+                try:
+                    changelog_manifests = 
self.manifest_list_manager.read(changelog_manifest_list)
+                    for manifest_meta in changelog_manifests:
+                        manifest_file_path = (
+                            
f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}")
+                        self.table.file_io.delete_quietly(manifest_file_path)
+                except Exception:
+                    pass
+                changelog_path = f"{manifest_path}/{changelog_manifest_list}"
+                self.table.file_io.delete_quietly(changelog_path)
         except Exception as e:
             logger.warning(f"Failed to clean up temporary files during 
preparation failure: {e}", exc_info=True)
 
     def abort(self, commit_messages: List[CommitMessage]):
         """Abort commit and delete files. Uses external_path if available to 
ensure proper scheme handling."""
         for message in commit_messages:
-            for file in message.new_files:
+            for file in list(message.new_files) + 
list(message.changelog_files):
                 try:
                     path_to_delete = file.external_path if file.external_path 
else file.file_path
                     if path_to_delete:
                         path_str = str(path_to_delete)
                         self.table.file_io.delete_quietly(path_str)
                 except Exception as e:
-                    import logging
-                    logger = logging.getLogger(__name__)
                     path_to_delete = file.external_path if file.external_path 
else file.file_path
                     logger.warning(f"Failed to clean up file {path_to_delete} 
during abort: {e}")
 
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index c77f88e907..ee3b96d8d7 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -47,6 +47,7 @@ class FileStoreWrite:
         self.blob_consumer = None
         self.commit_identifier = 0
         self.options = CoreOptions.copy(table.options)
+        self.changelog_producer = self.options.changelog_producer()
         if self.table.bucket_mode() == BucketMode.POSTPONE_MODE:
             self.options.set(CoreOptions.DATA_FILE_PREFIX,
                              
(f"{self.options.data_file_prefix()}-u-{commit_user}"
@@ -78,6 +79,7 @@ class FileStoreWrite:
                 options=options,
                 write_cols=self.write_cols,
                 blob_consumer=self.blob_consumer,
+                changelog_producer=self.changelog_producer,
             )
         elif self._has_vector_columns() and options.with_vector_format():
             return DataVectorWriter(
@@ -95,7 +97,8 @@ class FileStoreWrite:
                 bucket=bucket,
                 max_seq_number=max_seq_number(),
                 options=options,
-                merge_function=self._build_pk_merge_function())
+                merge_function=self._build_pk_merge_function(),
+                changelog_producer=self.changelog_producer)
         else:
             seq_number = 0 if self.table.bucket_mode() == 
BucketMode.BUCKET_UNAWARE else max_seq_number()
             return AppendOnlyDataWriter(
@@ -104,7 +107,8 @@ class FileStoreWrite:
                 bucket=bucket,
                 max_seq_number=seq_number,
                 options=options,
-                write_cols=self.write_cols
+                write_cols=self.write_cols,
+                changelog_producer=self.changelog_producer
             )
 
     def _build_pk_merge_function(self):
@@ -230,11 +234,13 @@ class FileStoreWrite:
         commit_messages = []
         for (partition, bucket), writer in self.data_writers.items():
             committed_files = writer.prepare_commit()
-            if committed_files:
+            changelog_files = writer.prepare_changelog_commit()
+            if committed_files or changelog_files:
                 commit_message = CommitMessage(
                     partition=partition,
                     bucket=bucket,
-                    new_files=committed_files
+                    new_files=committed_files,
+                    changelog_files=changelog_files,
                 )
                 commit_messages.append(commit_message)
         return commit_messages
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 313caa7f6d..9f10e8a64e 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -21,7 +21,7 @@ import uuid
 from abc import ABC, abstractmethod
 from typing import Dict, List, Optional, Tuple
 
-from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.core_options import CoreOptions, ChangelogProducer
 from pypaimon.common.external_path_provider import ExternalPathProvider
 from pypaimon.data.timestamp import Timestamp
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -35,7 +35,8 @@ class DataWriter(ABC):
     """Base class for data writers that handle PyArrow tables directly."""
 
     def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, options: CoreOptions = None,
-                 write_cols: Optional[List[str]] = None):
+                 write_cols: Optional[List[str]] = None,
+                 changelog_producer: ChangelogProducer = 
ChangelogProducer.NONE):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -61,6 +62,12 @@ class DataWriter(ABC):
 
         self.pending_data: Optional[pa.Table] = None
         self.committed_files: List[DataFileMeta] = []
+        self.committed_changelog_files: List[DataFileMeta] = []
+        self.changelog_producer = changelog_producer
+        self.changelog_file_format = (
+            self.options.changelog_file_format()
+            or self.file_format
+        )
         self.write_cols = write_cols
         self.blob_as_descriptor = self.options.blob_as_descriptor()
 
@@ -68,8 +75,6 @@ class DataWriter(ABC):
         self.external_path_provider: Optional[ExternalPathProvider] = 
self.path_factory.create_external_path_provider(
             self.partition, self.bucket
         )
-        # Store the current generated external path to preserve scheme in 
metadata
-        self._current_external_path: Optional[str] = None
         # Variant shredding (static mode) — col_name → (obj_fields, 
target_arrow_type)
         self._variant_shredding: Dict[str, Tuple] = {}
         if self.file_format == CoreOptions.FILE_FORMAT_PARQUET \
@@ -111,6 +116,9 @@ class DataWriter(ABC):
 
         return self.committed_files.copy()
 
+    def prepare_changelog_commit(self) -> List[DataFileMeta]:
+        return self.committed_changelog_files.copy()
+
     def close(self):
         try:
             if self.pending_data is not None and self.pending_data.num_rows > 
0:
@@ -130,16 +138,14 @@ class DataWriter(ABC):
         Abort all writers and clean up resources. This method should be called 
when an error occurs
         during writing. It deletes any files that were written and cleans up 
resources.
         """
-        # Delete any files that were written
-        for file_meta in self.committed_files:
+        # Delete any files that were written (data + changelog)
+        for file_meta in self.committed_files + self.committed_changelog_files:
             try:
-                # Use external_path if available (contains full URL scheme), 
otherwise use file_path
                 path_to_delete = file_meta.external_path if 
file_meta.external_path else file_meta.file_path
                 if path_to_delete:
                     path_str = str(path_to_delete)
                     self.file_io.delete_quietly(path_str)
             except Exception as e:
-                # Log but don't raise - we want to clean up as much as possible
                 import logging
                 logger = logging.getLogger(__name__)
                 path_to_delete = file_meta.external_path if 
file_meta.external_path else file_meta.file_path
@@ -148,6 +154,7 @@ class DataWriter(ABC):
         # Clean up resources
         self.pending_data = None
         self.committed_files.clear()
+        self.committed_changelog_files.clear()
 
     @abstractmethod
     def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
@@ -177,11 +184,7 @@ class DataWriter(ABC):
         file_path = self._generate_file_path(file_name)
 
         is_external_path = self.external_path_provider is not None
-        if is_external_path:
-            # Use the stored external path from _generate_file_path to 
preserve scheme
-            external_path_str = self._current_external_path if 
self._current_external_path else None
-        else:
-            external_path_str = None
+        external_path_str = file_path if is_external_path else None
 
         if self._variant_shredding:
             data = self._apply_variant_shredding(data)
@@ -236,6 +239,7 @@ class DataWriter(ABC):
         min_seq = self.sequence_generator.start
         max_seq = self.sequence_generator.current
         self.sequence_generator.start = self.sequence_generator.current
+        creation_time = Timestamp.now()
         self.committed_files.append(DataFileMeta.create(
             file_name=file_name,
             file_size=self.file_io.get_file_size(file_path),
@@ -249,17 +253,23 @@ class DataWriter(ABC):
             schema_id=self.table.table_schema.id,
             level=0,
             extra_files=[],
-            creation_time=Timestamp.now(),
+            creation_time=creation_time,
             delete_row_count=0,
             file_source=0,
             value_stats_cols=None if value_stats_enabled else [],
-            external_path=external_path_str,  # Set external path if using 
external paths
+            external_path=external_path_str,
             first_row_id=None,
             write_cols=self.write_cols,
-            # None means all columns in the table have been written
             file_path=file_path,
         ))
 
+        if self.changelog_producer == ChangelogProducer.INPUT:
+            self._write_changelog_file(
+                data, min_key, max_key, key_stats, value_stats,
+                min_seq, max_seq, creation_time,
+                value_stats_enabled, external_path_str is not None,
+            )
+
     def _apply_variant_shredding(self, data: pa.Table) -> pa.Table:
         """Transform VARIANT columns into shredded Parquet format.
 
@@ -285,11 +295,54 @@ class DataWriter(ABC):
             return data
         return pa.Table.from_arrays(columns, schema=pa.schema(fields))
 
+    def _write_changelog_file(self, data, min_key, max_key, key_stats, 
value_stats,
+                              min_seq, max_seq, creation_time,
+                              value_stats_enabled, is_external):
+        cl_fmt = self.changelog_file_format
+        changelog_file_name = f"changelog-{uuid.uuid4()}-0.{cl_fmt}"
+        changelog_file_path = self._generate_file_path(changelog_file_name)
+
+        changelog_external_path = changelog_file_path if is_external else None
+
+        if cl_fmt == CoreOptions.FILE_FORMAT_PARQUET:
+            self.file_io.write_parquet(changelog_file_path, data, 
compression=self.compression,
+                                       zstd_level=self.zstd_level)
+        elif cl_fmt == CoreOptions.FILE_FORMAT_ORC:
+            self.file_io.write_orc(changelog_file_path, data, 
compression=self.compression,
+                                   zstd_level=self.zstd_level)
+        elif cl_fmt == CoreOptions.FILE_FORMAT_AVRO:
+            self.file_io.write_avro(changelog_file_path, data, 
compression=self.compression,
+                                    zstd_level=self.zstd_level)
+        else:
+            raise ValueError(f"Unsupported changelog file format: {cl_fmt}. "
+                             f"Supported formats: parquet, orc, avro.")
+
+        self.committed_changelog_files.append(DataFileMeta.create(
+            file_name=changelog_file_name,
+            file_size=self.file_io.get_file_size(changelog_file_path),
+            row_count=data.num_rows,
+            min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
+            max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
+            key_stats=key_stats,
+            value_stats=value_stats,
+            min_sequence_number=min_seq,
+            max_sequence_number=max_seq,
+            schema_id=self.table.table_schema.id,
+            level=0,
+            extra_files=[],
+            creation_time=creation_time,
+            delete_row_count=0,
+            file_source=0,
+            value_stats_cols=None if value_stats_enabled else [],
+            external_path=changelog_external_path,
+            first_row_id=None,
+            write_cols=self.write_cols,
+            file_path=changelog_file_path,
+        ))
+
     def _generate_file_path(self, file_name: str) -> str:
         if self.external_path_provider:
-            external_path = 
self.external_path_provider.get_next_external_data_path(file_name)
-            self._current_external_path = external_path
-            return external_path
+            return 
self.external_path_provider.get_next_external_data_path(file_name)
 
         bucket_path = self.path_factory.bucket_path(self.partition, 
self.bucket)
         return f"{bucket_path.rstrip('/')}/{file_name}"
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py 
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index 01216b36cd..444e88332b 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -21,7 +21,7 @@ from typing import Dict, List, Optional, Tuple
 
 import pyarrow as pa
 
-from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.core_options import CoreOptions, ChangelogProducer
 from pypaimon.data.timestamp import Timestamp
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
@@ -51,8 +51,10 @@ class DedicatedFormatWriter(DataWriter):
     CHECK_ROLLING_RECORD_CNT = 1000
 
     def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, options: CoreOptions = None,
-                 write_cols: Optional[List[str]] = None, blob_consumer: 
Optional[BlobConsumer] = None):
-        super().__init__(table, partition, bucket, max_seq_number, options, 
write_cols=write_cols)
+                 write_cols: Optional[List[str]] = None, blob_consumer: 
Optional[BlobConsumer] = None,
+                 changelog_producer: ChangelogProducer = 
ChangelogProducer.NONE):
+        super().__init__(table, partition, bucket, max_seq_number, options, 
write_cols=write_cols,
+                         changelog_producer=changelog_producer)
 
         # Determine blob columns from table schema
         self.blob_column_names = self._get_blob_columns_from_schema()
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 64f6003a06..5200dd2078 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -23,6 +23,7 @@ import pyarrow.compute as pc
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.read.reader.deduplicate_merge_function import \
     DeduplicateMergeFunction
+from pypaimon.common.options.core_options import ChangelogProducer
 from pypaimon.table.row.key_value import KeyValue
 from pypaimon.write.writer.data_writer import DataWriter
 
@@ -40,9 +41,10 @@ class KeyValueDataWriter(DataWriter):
     """
 
     def __init__(self, table, partition, bucket, max_seq_number,
-                 options=None, write_cols=None, merge_function=None):
+                 options=None, write_cols=None, merge_function=None,
+                 changelog_producer=ChangelogProducer.NONE):
         super().__init__(table, partition, bucket, max_seq_number,
-                         options, write_cols)
+                         options, write_cols, changelog_producer)
         # Defaults to deduplicate so direct callers (tests / future code
         # paths that don't go through FileStoreWrite) don't accidentally
         # skip the merge step entirely.


Reply via email to