This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 13e370a  feat: support input changelog for primary key writes (#318)
13e370a is described below

commit 13e370ae5cb450351dcbdabb60d3ea7cc79847ab
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 20 16:13:00 2026 +0800

    feat: support input changelog for primary key writes (#318)
---
 crates/integrations/datafusion/tests/pk_tables.rs |  32 +-
 crates/paimon/src/spec/core_options.rs            | 166 ++++++-
 crates/paimon/src/table/commit_message.rs         |   3 +
 crates/paimon/src/table/kv_file_writer.rs         | 260 ++++++++---
 crates/paimon/src/table/mod.rs                    |   1 +
 crates/paimon/src/table/prepared_files.rs         |  33 ++
 crates/paimon/src/table/table_commit.rs           | 126 +++++-
 crates/paimon/src/table/table_write.rs            | 512 +++++++++++++++++++++-
 8 files changed, 1032 insertions(+), 101 deletions(-)

diff --git a/crates/integrations/datafusion/tests/pk_tables.rs 
b/crates/integrations/datafusion/tests/pk_tables.rs
index fa0bd6f..bf1a317 100644
--- a/crates/integrations/datafusion/tests/pk_tables.rs
+++ b/crates/integrations/datafusion/tests/pk_tables.rs
@@ -1460,9 +1460,9 @@ async fn test_pk_partitioned_multi_bucket() {
 
 // ======================= Error Cases =======================
 
-/// PK table with changelog-producer=input should be rejected.
+/// PK table with changelog-producer=input should write through DataFusion SQL.
 #[tokio::test]
-async fn test_pk_reject_changelog_producer_input() {
+async fn test_pk_input_changelog_write_read() {
     let (_tmp, sql_context) = setup_sql_context().await;
 
     sql_context
@@ -1475,18 +1475,24 @@ async fn test_pk_reject_changelog_producer_input() {
         .await
         .unwrap();
 
-    let result = sql_context
-        .sql("INSERT INTO paimon.test_db.t_changelog VALUES (1, 'alice')")
-        .await;
+    sql_context
+        .sql(
+            "INSERT INTO paimon.test_db.t_changelog VALUES
+                (1, 'alice'), (1, 'bob'), (2, 'carol')",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
 
-    let is_err = match result {
-        Err(_) => true,
-        Ok(df) => df.collect().await.is_err(),
-    };
-    assert!(
-        is_err,
-        "PK table with changelog-producer=input should reject writes"
-    );
+    let rows = collect_id_name(
+        &sql_context,
+        "SELECT id, name FROM paimon.test_db.t_changelog ORDER BY id",
+    )
+    .await;
+
+    assert_eq!(rows, vec![(1, "bob".to_string()), (2, "carol".to_string())]);
 }
 
 // ======================= String Primary Key =======================
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index bafad0a..aafdf52 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -40,6 +40,10 @@ const COMMIT_MAX_RETRY_WAIT_OPTION: &str = 
"commit.max-retry-wait";
 const FILE_COMPRESSION_OPTION: &str = "file.compression";
 const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level";
 const FILE_FORMAT_OPTION: &str = "file.format";
+const CHANGELOG_FILE_PREFIX_OPTION: &str = "changelog-file.prefix";
+const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format";
+const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression";
+const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode";
 const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
 const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
 const SEQUENCE_FIELD_OPTION: &str = "sequence.field";
@@ -55,6 +59,7 @@ pub const SCAN_VERSION_OPTION: &str = "scan.version";
 const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
 const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
 const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
+const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-";
 const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
 const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
 const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = 
"dynamic-bucket.target-row-num";
@@ -75,6 +80,32 @@ pub enum MergeEngine {
     FirstRow,
 }
 
+/// Changelog producer for table writes.
+///
+/// Reference: Java `CoreOptions.ChangelogProducer`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ChangelogProducer {
+    /// No changelog file.
+    None,
+    /// Double write input rows to changelog files.
+    Input,
+    /// Generate changelog files during full compaction.
+    FullCompaction,
+    /// Generate changelog files through lookup compaction.
+    Lookup,
+}
+
+impl ChangelogProducer {
+    pub fn as_str(&self) -> &'static str {
+        match self {
+            Self::None => "none",
+            Self::Input => "input",
+            Self::FullCompaction => "full-compaction",
+            Self::Lookup => "lookup",
+        }
+    }
+}
+
 /// Format the bucket directory name for a given bucket number.
 /// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise 
`"bucket-{N}"`.
 pub fn bucket_dir_name(bucket: i32) -> String {
@@ -138,7 +169,7 @@ impl<'a> CoreOptions<'a> {
         }
     }
 
-    /// Changelog producer setting. Default is "none".
+    /// Raw changelog producer setting. Default is `"none"`.
     pub fn changelog_producer(&self) -> &str {
         self.options
             .get(CHANGELOG_PRODUCER_OPTION)
@@ -146,6 +177,22 @@ impl<'a> CoreOptions<'a> {
             .unwrap_or("none")
     }
 
+    /// Typed changelog producer setting. Default is `None`.
+    pub fn try_changelog_producer(&self) -> crate::Result<ChangelogProducer> {
+        match self.options.get(CHANGELOG_PRODUCER_OPTION) {
+            None => Ok(ChangelogProducer::None),
+            Some(v) => match v.to_ascii_lowercase().as_str() {
+                "none" => Ok(ChangelogProducer::None),
+                "input" => Ok(ChangelogProducer::Input),
+                "full-compaction" => Ok(ChangelogProducer::FullCompaction),
+                "lookup" => Ok(ChangelogProducer::Lookup),
+                other => Err(crate::Error::Unsupported {
+                    message: format!("Unsupported changelog-producer: 
'{other}'"),
+                }),
+            },
+        }
+    }
+
     /// The `rowkind.field` option: a user column whose value encodes the row 
kind.
     pub fn rowkind_field(&self) -> Option<&str> {
         self.options.get(ROWKIND_FIELD_OPTION).map(String::as_str)
@@ -363,6 +410,43 @@ impl<'a> CoreOptions<'a> {
             .unwrap_or(1)
     }
 
+    /// File name prefix for changelog files. Default is `"changelog-"`.
+    pub fn changelog_file_prefix(&self) -> &str {
+        self.options
+            .get(CHANGELOG_FILE_PREFIX_OPTION)
+            .map(String::as_str)
+            .unwrap_or(DEFAULT_CHANGELOG_FILE_PREFIX)
+    }
+
+    /// Effective file format for changelog files.
+    ///
+    /// When `changelog-file.format` is not configured, Java Paimon falls back
+    /// to the table `file.format`.
+    pub fn changelog_file_format(&self) -> &str {
+        self.options
+            .get(CHANGELOG_FILE_FORMAT_OPTION)
+            .map(String::as_str)
+            .unwrap_or_else(|| self.file_format())
+    }
+
+    /// Effective compression codec for changelog files.
+    ///
+    /// When `changelog-file.compression` is not configured, Java Paimon falls
+    /// back to the table `file.compression`.
+    pub fn changelog_file_compression(&self) -> &str {
+        self.options
+            .get(CHANGELOG_FILE_COMPRESSION_OPTION)
+            .map(String::as_str)
+            .unwrap_or_else(|| self.file_compression())
+    }
+
+    /// Metadata stats collection mode for changelog files, if configured.
+    pub fn changelog_file_stats_mode(&self) -> Option<&str> {
+        self.options
+            .get(CHANGELOG_FILE_STATS_MODE_OPTION)
+            .map(String::as_str)
+    }
+
     /// Parquet writer in-progress buffer size limit. Default is 256MB.
     /// When the buffered data exceeds this, the writer flushes the current 
row group.
     pub fn write_parquet_buffer_size(&self) -> i64 {
@@ -546,6 +630,86 @@ mod tests {
         assert_eq!(core.merge_engine().unwrap(), MergeEngine::PartialUpdate);
     }
 
+    #[test]
+    fn test_changelog_producer_defaults_to_none() {
+        let options = HashMap::new();
+        let core = CoreOptions::new(&options);
+
+        assert_eq!(core.changelog_producer(), "none");
+        assert_eq!(
+            core.try_changelog_producer().unwrap(),
+            ChangelogProducer::None
+        );
+    }
+
+    #[test]
+    fn test_changelog_producer_accepts_known_values() {
+        for (value, expected) in [
+            ("none", ChangelogProducer::None),
+            ("input", ChangelogProducer::Input),
+            ("full-compaction", ChangelogProducer::FullCompaction),
+            ("lookup", ChangelogProducer::Lookup),
+            ("INPUT", ChangelogProducer::Input),
+        ] {
+            let options = 
HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(), value.into())]);
+            let core = CoreOptions::new(&options);
+
+            assert_eq!(core.try_changelog_producer().unwrap(), expected);
+        }
+    }
+
+    #[test]
+    fn test_changelog_producer_rejects_unknown_values() {
+        let options = HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(), 
"other".into())]);
+        let core = CoreOptions::new(&options);
+
+        let err = core
+            .try_changelog_producer()
+            .expect_err("unknown producer should fail");
+        assert!(
+            matches!(err, crate::Error::Unsupported { message } if 
message.contains("Unsupported changelog-producer"))
+        );
+    }
+
+    #[test]
+    fn test_changelog_file_options_defaults_and_overrides() {
+        let default_options = HashMap::from([
+            (FILE_FORMAT_OPTION.to_string(), "avro".to_string()),
+            (FILE_COMPRESSION_OPTION.to_string(), "snappy".to_string()),
+        ]);
+        let default_core = CoreOptions::new(&default_options);
+
+        assert_eq!(default_core.changelog_file_prefix(), "changelog-");
+        assert_eq!(default_core.changelog_file_format(), "avro");
+        assert_eq!(default_core.changelog_file_compression(), "snappy");
+        assert_eq!(default_core.changelog_file_stats_mode(), None);
+
+        let custom_options = HashMap::from([
+            (
+                CHANGELOG_FILE_PREFIX_OPTION.to_string(),
+                "custom-".to_string(),
+            ),
+            (
+                CHANGELOG_FILE_FORMAT_OPTION.to_string(),
+                "parquet".to_string(),
+            ),
+            (
+                CHANGELOG_FILE_COMPRESSION_OPTION.to_string(),
+                "zstd".to_string(),
+            ),
+            (
+                CHANGELOG_FILE_STATS_MODE_OPTION.to_string(),
+                "counts".to_string(),
+            ),
+        ]);
+        let custom_core = CoreOptions::new(&custom_options);
+
+        assert_eq!(custom_core.changelog_file_prefix(), "custom-");
+        assert_eq!(custom_core.changelog_file_format(), "parquet");
+        assert_eq!(custom_core.changelog_file_compression(), "zstd");
+        assert_eq!(custom_core.changelog_file_stats_mode(), Some("counts"));
+    }
+
     #[test]
     fn test_commit_options_defaults() {
         let options = HashMap::new();
diff --git a/crates/paimon/src/table/commit_message.rs 
b/crates/paimon/src/table/commit_message.rs
index ac7efb9..88d3533 100644
--- a/crates/paimon/src/table/commit_message.rs
+++ b/crates/paimon/src/table/commit_message.rs
@@ -29,6 +29,8 @@ pub struct CommitMessage {
     pub bucket: i32,
     /// New data files to be added.
     pub new_files: Vec<DataFileMeta>,
+    /// New changelog files to be added.
+    pub new_changelog_files: Vec<DataFileMeta>,
     /// New index files to be added (used by dynamic bucket mode).
     pub new_index_files: Vec<IndexFileMeta>,
     /// Files to be deleted (copy-on-write rewrite: old files replaced by 
new_files).
@@ -41,6 +43,7 @@ impl CommitMessage {
             partition,
             bucket,
             new_files,
+            new_changelog_files: Vec::new(),
             new_index_files: Vec::new(),
             deleted_files: Vec::new(),
         }
diff --git a/crates/paimon/src/table/kv_file_writer.rs 
b/crates/paimon/src/table/kv_file_writer.rs
index a6d7180..55f1d6d 100644
--- a/crates/paimon/src/table/kv_file_writer.rs
+++ b/crates/paimon/src/table/kv_file_writer.rs
@@ -31,10 +31,12 @@ use crate::io::FileIO;
 use crate::spec::stats::{compute_column_stats, BinaryTableStats};
 use crate::spec::{
     extract_datum_from_arrow, BinaryRowBuilder, DataFileMeta, DataType, 
MergeEngine,
-    PartialUpdateConfig, EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME, 
VALUE_KIND_FIELD_NAME,
+    PartialUpdateConfig, RowKind, EMPTY_SERIALIZED_ROW, 
SEQUENCE_NUMBER_FIELD_NAME,
+    VALUE_KIND_FIELD_NAME,
 };
+use crate::table::prepared_files::PreparedFiles;
 use crate::Result;
-use arrow_array::{Int64Array, Int8Array, RecordBatch};
+use arrow_array::{Array, Int64Array, Int8Array, RecordBatch, UInt32Array};
 use arrow_ord::sort::{lexsort_to_indices, SortColumn, SortOptions};
 use arrow_row::{RowConverter, SortField};
 use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as 
ArrowSchema};
@@ -55,6 +57,8 @@ pub(crate) struct KeyValueFileWriter {
     buffer_bytes: usize,
     /// Completed file metadata.
     written_files: Vec<DataFileMeta>,
+    /// Completed changelog file metadata.
+    written_changelog_files: Vec<DataFileMeta>,
 }
 
 /// Configuration for [`KeyValueFileWriter`], grouping file-location, schema,
@@ -70,6 +74,10 @@ pub(crate) struct KeyValueWriteConfig {
     pub file_compression_zstd_level: i32,
     pub write_buffer_size: i64,
     pub file_format: String,
+    pub input_changelog: bool,
+    pub changelog_file_prefix: String,
+    pub changelog_file_compression: String,
+    pub changelog_file_format: String,
     /// Primary key column indices in the user schema.
     pub primary_key_indices: Vec<usize>,
     /// Paimon DataTypes for each primary key column (same order as 
primary_key_indices).
@@ -81,6 +89,16 @@ pub(crate) struct KeyValueWriteConfig {
     pub deletion_vectors_enabled: bool,
 }
 
+struct IndexedFileWrite<'a> {
+    file_prefix: &'a str,
+    file_ordinal: usize,
+    file_format: &'a str,
+    file_compression: &'a str,
+    min_sequence_number: i64,
+    max_sequence_number: i64,
+    delete_row_count: i64,
+}
+
 impl KeyValueFileWriter {
     pub(crate) fn new(
         file_io: FileIO,
@@ -108,6 +126,7 @@ impl KeyValueFileWriter {
             buffer: Vec::new(),
             buffer_bytes: 0,
             written_files: Vec::new(),
+            written_changelog_files: Vec::new(),
         })
     }
 
@@ -208,23 +227,82 @@ impl KeyValueFileWriter {
         //   FirstRow      → keep first row per key group (lowest seq)
         //   PartialUpdate → keep all rows for read-side field-wise merge
         let selected_indices = self.select_flush_indices(&combined, 
&sorted_indices)?;
-        let selected_num_rows = selected_indices.len();
+        let selected_u32 = UInt32Array::from(selected_indices);
 
-        // Extract min_key / max_key from selected endpoints.
-        let first_row = selected_indices[0] as usize;
-        let last_row = selected_indices[selected_num_rows - 1] as usize;
-        let min_key = self.extract_key_binary_row(&combined, first_row)?;
-        let max_key = self.extract_key_binary_row(&combined, last_row)?;
+        let data_delete_row_count = Self::indexed_delete_row_count(&combined, 
&selected_u32)?;
+        let changelog_delete_row_count = if self.config.input_changelog {
+            Some(Self::indexed_delete_row_count(&combined, &sorted_indices)?)
+        } else {
+            None
+        };
 
-        // Build physical schema and open writer.
-        let physical_schema = build_physical_schema(&user_schema);
+        let data_file = self
+            .write_indexed_file(
+                &combined,
+                seq_array.as_ref(),
+                &selected_u32,
+                IndexedFileWrite {
+                    file_prefix: "data-",
+                    file_ordinal: self.written_files.len(),
+                    file_format: &self.config.file_format,
+                    file_compression: &self.config.file_compression,
+                    min_sequence_number: start_seq,
+                    max_sequence_number: end_seq,
+                    delete_row_count: data_delete_row_count,
+                },
+            )
+            .await?;
+        self.written_files.push(data_file);
+
+        if let Some(delete_row_count) = changelog_delete_row_count {
+            let changelog_file = self
+                .write_indexed_file(
+                    &combined,
+                    seq_array.as_ref(),
+                    &sorted_indices,
+                    IndexedFileWrite {
+                        file_prefix: &self.config.changelog_file_prefix,
+                        file_ordinal: self.written_changelog_files.len(),
+                        file_format: &self.config.changelog_file_format,
+                        file_compression: 
&self.config.changelog_file_compression,
+                        min_sequence_number: start_seq,
+                        max_sequence_number: end_seq,
+                        delete_row_count,
+                    },
+                )
+                .await?;
+            self.written_changelog_files.push(changelog_file);
+        }
+        Ok(())
+    }
+
+    async fn write_indexed_file(
+        &self,
+        batch: &RecordBatch,
+        seq_array: &dyn Array,
+        indices: &UInt32Array,
+        write: IndexedFileWrite<'_>,
+    ) -> Result<DataFileMeta> {
+        if indices.is_empty() {
+            return Err(crate::Error::DataInvalid {
+                message: "Cannot write an empty key-value data 
file".to_string(),
+                source: None,
+            });
+        }
 
-        // Open file writer.
+        let user_schema = batch.schema();
+        let first_row = indices.value(0) as usize;
+        let last_row = indices.value(indices.len() - 1) as usize;
+        let min_key = self.extract_key_binary_row(batch, first_row)?;
+        let max_key = self.extract_key_binary_row(batch, last_row)?;
+
+        let physical_schema = build_physical_schema(&user_schema);
         let file_name = format!(
-            "data-{}-{}.{}",
+            "{}{}-{}.{}",
+            write.file_prefix,
             uuid::Uuid::new_v4(),
-            self.written_files.len(),
-            self.config.file_format,
+            write.file_ordinal,
+            write.file_format,
         );
         let bucket_dir = if self.config.partition_path.is_empty() {
             format!(
@@ -238,44 +316,42 @@ impl KeyValueFileWriter {
             )
         };
         self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
-        let file_path = format!("{}/{}", bucket_dir, file_name);
+        let file_path = format!("{bucket_dir}/{file_name}");
         let output = self.file_io.new_output(&file_path)?;
         let mut writer = create_format_writer(
             &output,
             physical_schema.clone(),
-            &self.config.file_compression,
+            write.file_compression,
             self.config.file_compression_zstd_level,
             None,
         )
         .await?;
 
-        // Chunked write using selected indices.
-        let selected_u32 = arrow_array::UInt32Array::from(selected_indices);
-        for chunk_start in 
(0..selected_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) {
-            let chunk_len = Self::FLUSH_CHUNK_ROWS.min(selected_num_rows - 
chunk_start);
-            let chunk_indices = selected_u32.slice(chunk_start, chunk_len);
+        let vk_idx = batch
+            .schema()
+            .fields()
+            .iter()
+            .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
+
+        for chunk_start in (0..indices.len()).step_by(Self::FLUSH_CHUNK_ROWS) {
+            let chunk_len = Self::FLUSH_CHUNK_ROWS.min(indices.len() - 
chunk_start);
+            let chunk_indices = indices.slice(chunk_start, chunk_len);
 
-            let mut physical_columns: Vec<Arc<dyn arrow_array::Array>> = 
Vec::new();
-            // Sequence numbers for this chunk.
+            let mut physical_columns: Vec<Arc<dyn Array>> = Vec::new();
             physical_columns.push(
-                arrow_select::take::take(seq_array.as_ref(), &chunk_indices, 
None).map_err(
-                    |e| crate::Error::DataInvalid {
+                arrow_select::take::take(seq_array, &chunk_indices, 
None).map_err(|e| {
+                    crate::Error::DataInvalid {
                         message: format!("Failed to reorder sequence numbers: 
{e}"),
                         source: None,
-                    },
-                )?,
+                    }
+                })?,
             );
-            // Value kind column — resolve from batch schema.
-            let vk_idx = combined
-                .schema()
-                .fields()
-                .iter()
-                .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
+
             match vk_idx {
                 Some(vk_idx) => {
                     physical_columns.push(
                         arrow_select::take::take(
-                            combined.column(vk_idx).as_ref(),
+                            batch.column(vk_idx).as_ref(),
                             &chunk_indices,
                             None,
                         )
@@ -286,21 +362,20 @@ impl KeyValueFileWriter {
                     );
                 }
                 None => {
-                    // All rows are INSERT (value_kind = 0).
                     physical_columns.push(Arc::new(Int8Array::from(vec![0i8; 
chunk_len])));
                 }
             }
-            // All user columns (skip _VALUE_KIND if present — already handled 
above).
-            for idx in 0..combined.num_columns() {
+
+            for idx in 0..batch.num_columns() {
                 if Some(idx) == vk_idx {
                     continue;
                 }
                 physical_columns.push(
-                    arrow_select::take::take(combined.column(idx).as_ref(), 
&chunk_indices, None)
+                    arrow_select::take::take(batch.column(idx).as_ref(), 
&chunk_indices, None)
                         .map_err(|e| crate::Error::DataInvalid {
-                        message: format!("Failed to reorder by sort indices: 
{e}"),
-                        source: None,
-                    })?,
+                            message: format!("Failed to reorder by sort 
indices: {e}"),
+                            source: None,
+                        })?,
                 );
             }
 
@@ -314,20 +389,20 @@ impl KeyValueFileWriter {
 
         let file_size = writer.close().await? as i64;
 
-        // Compute key_stats on selected output rows (not the raw combined 
batch).
-        let selected_key_columns: Vec<Arc<dyn arrow_array::Array>> = self
+        let key_columns: Vec<Arc<dyn Array>> = self
             .config
             .primary_key_indices
             .iter()
             .map(|&idx| {
-                arrow_select::take::take(combined.column(idx).as_ref(), 
&selected_u32, None)
-                    .map_err(|e| crate::Error::DataInvalid {
+                arrow_select::take::take(batch.column(idx).as_ref(), indices, 
None).map_err(|e| {
+                    crate::Error::DataInvalid {
                         message: format!("Failed to take key column for stats: 
{e}"),
                         source: None,
-                    })
+                    }
+                })
             })
             .collect::<Result<Vec<_>>>()?;
-        let selected_key_batch = RecordBatch::try_new(
+        let key_batch = RecordBatch::try_new(
             Arc::new(ArrowSchema::new(
                 self.config
                     .primary_key_indices
@@ -335,24 +410,23 @@ impl KeyValueFileWriter {
                     .map(|&idx| user_schema.field(idx).clone())
                     .collect::<Vec<_>>(),
             )),
-            selected_key_columns,
+            key_columns,
         )
         .map_err(|e| crate::Error::DataInvalid {
-            message: format!("Failed to build selected key batch for stats: 
{e}"),
+            message: format!("Failed to build key batch for stats: {e}"),
             source: None,
         })?;
         let stats_col_indices: Vec<usize> = 
(0..self.config.primary_key_indices.len()).collect();
         let key_stats = compute_column_stats(
-            &selected_key_batch,
+            &key_batch,
             &stats_col_indices,
             &self.config.primary_key_types,
         )?;
 
-        // Sequence numbers span the full assigned range.
-        let meta = DataFileMeta {
+        Ok(DataFileMeta {
             file_name,
             file_size,
-            row_count: selected_num_rows as i64,
+            row_count: indices.len() as i64,
             min_key,
             max_key,
             key_stats,
@@ -361,22 +435,54 @@ impl KeyValueFileWriter {
                 EMPTY_SERIALIZED_ROW.clone(),
                 vec![],
             ),
-            min_sequence_number: start_seq,
-            max_sequence_number: end_seq,
+            min_sequence_number: write.min_sequence_number,
+            max_sequence_number: write.max_sequence_number,
             schema_id: self.config.schema_id,
             level: 0,
             extra_files: vec![],
             creation_time: Some(Utc::now()),
-            delete_row_count: Some(0),
+            delete_row_count: Some(write.delete_row_count),
             embedded_index: None,
             file_source: Some(0), // FileSource.APPEND
             value_stats_cols: Some(vec![]),
             external_path: None,
             first_row_id: None,
             write_cols: None,
+        })
+    }
+
+    fn indexed_delete_row_count(batch: &RecordBatch, indices: &UInt32Array) -> 
Result<i64> {
+        let Some(vk_idx) = batch
+            .schema()
+            .fields()
+            .iter()
+            .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME)
+        else {
+            return Ok(0);
         };
-        self.written_files.push(meta);
-        Ok(())
+
+        let column = batch.column(vk_idx);
+        let Some(value_kinds) = column.as_any().downcast_ref::<Int8Array>() 
else {
+            return Err(crate::Error::DataInvalid {
+                message: "_VALUE_KIND column must be Int8".to_string(),
+                source: None,
+            });
+        };
+
+        let mut delete_count = 0;
+        for idx in 0..indices.len() {
+            let row = indices.value(idx) as usize;
+            let value = if column.is_null(row) {
+                0
+            } else {
+                value_kinds.value(row)
+            };
+            match RowKind::from_value(value)? {
+                RowKind::UpdateBefore | RowKind::Delete => delete_count += 1,
+                RowKind::Insert | RowKind::UpdateAfter => {}
+            }
+        }
+        Ok(delete_count)
     }
 
     /// Select output row indices from sorted inputs according to merge engine.
@@ -468,9 +574,12 @@ impl KeyValueFileWriter {
     }
 
     /// Flush remaining buffer and return all written file metadata.
-    pub(crate) async fn prepare_commit(&mut self) -> Result<Vec<DataFileMeta>> 
{
+    pub(crate) async fn prepare_commit(&mut self) -> Result<PreparedFiles> {
         self.flush().await?;
-        Ok(std::mem::take(&mut self.written_files))
+        Ok(PreparedFiles {
+            data_files: std::mem::take(&mut self.written_files),
+            changelog_files: std::mem::take(&mut self.written_changelog_files),
+        })
     }
 
     /// Extract primary key columns from a batch at a given row index into a 
serialized BinaryRow.
@@ -539,6 +648,10 @@ mod tests {
             file_compression_zstd_level: 0,
             write_buffer_size: 1024,
             file_format: "parquet".to_string(),
+            input_changelog: false,
+            changelog_file_prefix: "changelog-".to_string(),
+            changelog_file_compression: "none".to_string(),
+            changelog_file_format: "parquet".to_string(),
             primary_key_indices: vec![0],
             primary_key_types: vec![DataType::Int(IntType::new())],
             sequence_field_indices: vec![1],
@@ -610,6 +723,33 @@ mod tests {
         assert_eq!(selected, vec![0, 1]);
     }
 
+    #[test]
+    fn test_indexed_delete_row_count_rejects_invalid_value_kind() {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)),
+            Arc::new(ArrowField::new(
+                VALUE_KIND_FIELD_NAME,
+                ArrowDataType::Int8,
+                false,
+            )),
+        ]));
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int32Array::from(vec![1])) as Arc<dyn 
arrow_array::Array>,
+                Arc::new(Int8Array::from(vec![4])) as Arc<dyn 
arrow_array::Array>,
+            ],
+        )
+        .unwrap();
+        let indices = UInt32Array::from(vec![0]);
+
+        let err = KeyValueFileWriter::indexed_delete_row_count(&batch, 
&indices).unwrap_err();
+
+        assert!(
+            matches!(err, crate::Error::DataInvalid { message, .. } if 
message.contains("Invalid RowKind value"))
+        );
+    }
+
     #[test]
     fn test_new_rejects_partial_update_with_deletion_vectors() {
         let mut config = test_write_config(MergeEngine::PartialUpdate);
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index c00fa78..c425a9e 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -39,6 +39,7 @@ mod kv_file_reader;
 mod kv_file_writer;
 mod partition_filter;
 mod postpone_file_writer;
+mod prepared_files;
 mod read_builder;
 pub mod referenced_files;
 pub(crate) mod rest_env;
diff --git a/crates/paimon/src/table/prepared_files.rs 
b/crates/paimon/src/table/prepared_files.rs
new file mode 100644
index 0000000..d2c89f9
--- /dev/null
+++ b/crates/paimon/src/table/prepared_files.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::spec::DataFileMeta;
+
+/// Files produced by closing a writer.
+pub(crate) struct PreparedFiles {
+    pub data_files: Vec<DataFileMeta>,
+    pub changelog_files: Vec<DataFileMeta>,
+}
+
+impl PreparedFiles {
+    pub(crate) fn data(data_files: Vec<DataFileMeta>) -> Self {
+        Self {
+            data_files,
+            changelog_files: Vec::new(),
+        }
+    }
+}
diff --git a/crates/paimon/src/table/table_commit.rs 
b/crates/paimon/src/table/table_commit.rs
index 0de494e..664b654 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -98,9 +98,11 @@ impl TableCommit {
         }
 
         let entries = self.messages_to_entries(&commit_messages);
+        let changelog_entries = 
self.messages_to_changelog_entries(&commit_messages);
         let new_index_entries = 
self.messages_to_index_entries(&commit_messages);
         self.try_commit(CommitEntriesPlan::Direct {
             entries,
+            changelog_entries,
             new_index_entries,
         })
         .await
@@ -129,6 +131,9 @@ impl TableCommit {
 
         let new_entries = self.messages_to_entries(&commit_messages);
         let new_index_entries = 
self.messages_to_index_entries(&commit_messages);
+        let has_new_data_entries = new_entries
+            .iter()
+            .any(|entry| *entry.kind() == FileKind::Add);
 
         let partition_filter = if let Some(sp) = static_partitions {
             let partition_keys = self.table.schema().partition_keys();
@@ -144,8 +149,10 @@ impl TableCommit {
             } else {
                 Some(self.build_static_partition_predicate(&sp, 
&partition_fields)?)
             }
+        } else if !self.table.schema().partition_fields().is_empty() && 
!has_new_data_entries {
+            return Ok(());
         } else {
-            self.build_dynamic_partition_filter(&commit_messages)?
+            self.build_dynamic_partition_filter(&new_entries)?
         };
 
         self.try_commit(CommitEntriesPlan::Overwrite {
@@ -182,13 +189,13 @@ impl TableCommit {
         Ok(PartitionFilter::from_predicate(combined, partition_fields))
     }
 
-    /// Build a dynamic partition filter from the partitions present in commit 
messages.
+    /// Build a dynamic partition filter from the partitions present in new 
data entries.
     ///
     /// Returns `None` for unpartitioned tables (full table overwrite).
     /// Uses `PartitionSet` for O(1) byte-level matching.
     fn build_dynamic_partition_filter(
         &self,
-        commit_messages: &[CommitMessage],
+        entries: &[ManifestEntry],
     ) -> Result<Option<PartitionFilter>> {
         let partition_fields = self.table.schema().partition_fields();
         if partition_fields.is_empty() {
@@ -196,8 +203,10 @@ impl TableCommit {
         }
 
         let mut partition_bytes_set: HashSet<Vec<u8>> = HashSet::new();
-        for msg in commit_messages {
-            partition_bytes_set.insert(msg.partition.clone());
+        for entry in entries {
+            if *entry.kind() == FileKind::Add {
+                partition_bytes_set.insert(entry.partition().to_vec());
+            }
         }
 
         Ok(Some(PartitionFilter::from_partition_set(
@@ -298,7 +307,7 @@ impl TableCommit {
             let latest_snapshot = 
self.snapshot_manager.get_latest_snapshot().await?;
             let resolved = self.resolve_commit(&plan, &latest_snapshot).await?;
 
-            if resolved.entries.is_empty() {
+            if resolved.entries.is_empty() && 
resolved.changelog_entries.is_empty() {
                 break;
             }
 
@@ -376,11 +385,15 @@ impl TableCommit {
         let unique_id = uuid::Uuid::new_v4();
         let base_manifest_list_name = format!("manifest-list-{unique_id}-0");
         let delta_manifest_list_name = format!("manifest-list-{unique_id}-1");
+        let changelog_manifest_list_name = 
format!("manifest-list-{unique_id}-2");
         let new_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4());
+        let changelog_manifest_name = format!("manifest-{}-1", 
uuid::Uuid::new_v4());
 
         let base_manifest_list_path = 
format!("{manifest_dir}/{base_manifest_list_name}");
         let delta_manifest_list_path = 
format!("{manifest_dir}/{delta_manifest_list_name}");
+        let changelog_manifest_list_path = 
format!("{manifest_dir}/{changelog_manifest_list_name}");
         let new_manifest_path = format!("{manifest_dir}/{new_manifest_name}");
+        let changelog_manifest_path = 
format!("{manifest_dir}/{changelog_manifest_name}");
 
         // Write manifest file
         let new_manifest_file_meta = self
@@ -400,6 +413,32 @@ impl TableCommit {
         )
         .await?;
 
+        let changelog_record_count = if resolved.changelog_entries.is_empty() {
+            None
+        } else {
+            let changelog_manifest_file_meta = self
+                .write_manifest_file(
+                    file_io,
+                    &changelog_manifest_path,
+                    &changelog_manifest_name,
+                    &resolved.changelog_entries,
+                )
+                .await?;
+            ManifestList::write(
+                file_io,
+                &changelog_manifest_list_path,
+                &[changelog_manifest_file_meta],
+            )
+            .await?;
+            Some(
+                resolved
+                    .changelog_entries
+                    .iter()
+                    .map(|entry| entry.file().row_count)
+                    .sum(),
+            )
+        };
+
         // Read existing manifests (base + delta from previous snapshot) and 
write base manifest list
         let mut total_record_count: i64 = 0;
         let existing_manifest_files = if let Some(snap) = latest_snapshot {
@@ -441,6 +480,8 @@ impl TableCommit {
             .time_millis(current_time_millis())
             .total_record_count(Some(total_record_count))
             .delta_record_count(Some(delta_record_count))
+            .changelog_manifest_list(changelog_record_count.map(|_| 
changelog_manifest_list_name))
+            .changelog_record_count(changelog_record_count)
             .next_row_id(next_row_id)
             .index_manifest(resolved.index_manifest_name)
             .build();
@@ -533,6 +574,7 @@ impl TableCommit {
         match plan {
             CommitEntriesPlan::Direct {
                 entries,
+                changelog_entries,
                 new_index_entries,
             } => {
                 if self.row_tracking_enabled {
@@ -576,6 +618,7 @@ impl TableCommit {
 
                 Ok(ResolvedCommit {
                     entries: entries.clone(),
+                    changelog_entries: changelog_entries.clone(),
                     kind,
                     index_manifest_name,
                 })
@@ -613,6 +656,7 @@ impl TableCommit {
 
                 Ok(ResolvedCommit {
                     entries,
+                    changelog_entries: vec![],
                     kind: CommitKind::OVERWRITE,
                     index_manifest_name,
                 })
@@ -1031,6 +1075,25 @@ impl TableCommit {
             .collect()
     }
 
+    /// Convert commit messages to changelog manifest entries (ADD kind only).
+    fn messages_to_changelog_entries(&self, messages: &[CommitMessage]) -> 
Vec<ManifestEntry> {
+        messages
+            .iter()
+            .flat_map(|msg| {
+                msg.new_changelog_files.iter().map(|file| {
+                    ManifestEntry::new(
+                        FileKind::Add,
+                        msg.partition.clone(),
+                        msg.bucket,
+                        self.total_buckets,
+                        file.clone(),
+                        0,
+                    )
+                })
+            })
+            .collect()
+    }
+
     /// Convert commit messages to index manifest entries (ADD kind).
     fn messages_to_index_entries(&self, messages: &[CommitMessage]) -> 
Vec<IndexManifestEntry> {
         messages
@@ -1056,6 +1119,7 @@ enum CommitEntriesPlan {
     /// rewrites, in which case `resolve_commit` auto-promotes to 
`CommitKind::OVERWRITE`.
     Direct {
         entries: Vec<ManifestEntry>,
+        changelog_entries: Vec<ManifestEntry>,
         new_index_entries: Vec<IndexManifestEntry>,
     },
     /// Overwrite with optional partition filter.
@@ -1069,6 +1133,7 @@ enum CommitEntriesPlan {
 /// Fully resolved commit ready for writing.
 struct ResolvedCommit {
     entries: Vec<ManifestEntry>,
+    changelog_entries: Vec<ManifestEntry>,
     kind: CommitKind,
     index_manifest_name: Option<String>,
 }
@@ -1371,6 +1436,35 @@ mod tests {
         assert_eq!(snapshot.total_record_count(), Some(250));
     }
 
+    #[tokio::test]
+    async fn test_dynamic_overwrite_ignores_changelog_only_message() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_dynamic_overwrite_changelog_only";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_partitioned_commit(&file_io, table_path);
+        commit
+            .commit(vec![CommitMessage::new(
+                partition_bytes("a"),
+                0,
+                vec![test_data_file("data-a.parquet", 100)],
+            )])
+            .await
+            .unwrap();
+
+        let mut message = CommitMessage::new(partition_bytes("a"), 0, vec![]);
+        message.new_changelog_files = 
vec![test_data_file("changelog-a.parquet", 1)];
+
+        commit.overwrite(vec![message], None).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io, 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 1);
+        assert_eq!(snapshot.commit_kind(), &CommitKind::APPEND);
+        assert_eq!(snapshot.total_record_count(), Some(100));
+        assert_eq!(snapshot.changelog_manifest_list(), None);
+    }
+
     #[tokio::test]
     async fn test_drop_partitions() {
         let file_io = test_file_io();
@@ -1622,6 +1716,26 @@ mod tests {
         assert_eq!(snapshot.total_record_count(), Some(350));
     }
 
+    #[tokio::test]
+    async fn test_overwrite_ignores_changelog_files() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_overwrite_changelog_files";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_commit(&file_io, table_path);
+        let mut message = CommitMessage::new(vec![], 0, 
vec![test_data_file("data.parquet", 1)]);
+        message.new_changelog_files = vec![test_data_file("changelog.parquet", 
1)];
+
+        commit.overwrite(vec![message], None).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io, 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+        assert_eq!(snapshot.total_record_count(), Some(1));
+        assert_eq!(snapshot.changelog_record_count(), None);
+        assert_eq!(snapshot.changelog_manifest_list(), None);
+    }
+
     #[tokio::test]
     async fn test_delete_conflict_rejects_missing_file() {
         let file_io = test_file_io();
diff --git a/crates/paimon/src/table/table_write.rs 
b/crates/paimon/src/table/table_write.rs
index 49c0019..f34c124 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -21,10 +21,10 @@
 //! and [pypaimon 
FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py)
 
 use crate::arrow::build_target_arrow_schema;
-use crate::spec::DataFileMeta;
 use crate::spec::PartitionComputer;
 use crate::spec::{
-    BinaryRow, CoreOptions, DataType, MergeEngine, EMPTY_SERIALIZED_ROW, 
POSTPONE_BUCKET,
+    BinaryRow, ChangelogProducer, CoreOptions, DataType, MergeEngine, 
EMPTY_SERIALIZED_ROW,
+    POSTPONE_BUCKET,
 };
 use crate::table::blob_file_writer::AppendBlobFileWriter;
 use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey};
@@ -37,6 +37,7 @@ use crate::table::data_file_writer::DataFileWriter;
 use crate::table::kv_file_writer::{KeyValueFileWriter, KeyValueWriteConfig};
 use crate::table::partition_filter::PartitionFilter;
 use crate::table::postpone_file_writer::{PostponeFileWriter, 
PostponeWriteConfig};
+use crate::table::prepared_files::PreparedFiles;
 use crate::table::{SnapshotManager, Table, TableScan};
 use crate::Result;
 use arrow_array::RecordBatch;
@@ -61,12 +62,12 @@ impl FileWriter {
         }
     }
 
-    async fn prepare_commit(mut self) -> Result<Vec<DataFileMeta>> {
+    async fn prepare_commit(mut self) -> Result<PreparedFiles> {
         match self {
-            FileWriter::Append(ref mut w) => w.prepare_commit().await,
-            FileWriter::AppendBlob(ref mut w) => w.prepare_commit().await,
+            FileWriter::Append(ref mut w) => 
w.prepare_commit().await.map(PreparedFiles::data),
+            FileWriter::AppendBlob(ref mut w) => 
w.prepare_commit().await.map(PreparedFiles::data),
             FileWriter::KeyValue(ref mut w) => w.prepare_commit().await,
-            FileWriter::Postpone(ref mut w) => w.prepare_commit().await,
+            FileWriter::Postpone(ref mut w) => 
w.prepare_commit().await.map(PreparedFiles::data),
         }
     }
 }
@@ -96,6 +97,10 @@ pub struct TableWrite {
     primary_key_types: Vec<DataType>,
     sequence_field_indices: Vec<usize>,
     merge_engine: MergeEngine,
+    changelog_producer: ChangelogProducer,
+    changelog_file_prefix: String,
+    changelog_file_format: String,
+    changelog_file_compression: String,
     partition_seq_cache: HashMap<Vec<u8>, HashMap<i32, i64>>,
     commit_user: String,
     /// Bucket assignment strategy (fixed, dynamic, or cross-partition).
@@ -138,6 +143,7 @@ impl TableWrite {
         let total_buckets = core_options.bucket();
         let has_primary_keys = !schema.primary_keys().is_empty();
         let is_dynamic_bucket = has_primary_keys && total_buckets == -1;
+        let changelog_producer = core_options.try_changelog_producer()?;
 
         let is_dynamic_cross_partition =
             is_dynamic_bucket && !schema.partition_keys().is_empty() && {
@@ -160,16 +166,6 @@ impl TableWrite {
                 ),
             });
         }
-        if has_primary_keys
-            && total_buckets != POSTPONE_BUCKET
-            && core_options
-                .changelog_producer()
-                .eq_ignore_ascii_case("input")
-        {
-            return Err(crate::Error::Unsupported {
-                message: "KeyValueFileWriter does not support 
changelog-producer=input".to_string(),
-            });
-        }
 
         if !has_primary_keys && total_buckets != -1 && 
core_options.bucket_key().is_none() {
             return Err(crate::Error::Unsupported {
@@ -181,6 +177,9 @@ impl TableWrite {
         let file_compression = core_options.file_compression().to_string();
         let file_compression_zstd_level = 
core_options.file_compression_zstd_level();
         let file_format = core_options.file_format().to_string();
+        let changelog_file_prefix = 
core_options.changelog_file_prefix().to_string();
+        let changelog_file_format = 
core_options.changelog_file_format().to_string();
+        let changelog_file_compression = 
core_options.changelog_file_compression().to_string();
         let write_buffer_size = core_options.write_parquet_buffer_size();
         let partition_keys: Vec<String> = schema.partition_keys().to_vec();
         let fields = schema.fields();
@@ -295,6 +294,10 @@ impl TableWrite {
             primary_key_types,
             sequence_field_indices,
             merge_engine,
+            changelog_producer,
+            changelog_file_prefix,
+            changelog_file_format,
+            changelog_file_compression,
             partition_seq_cache: HashMap::new(),
             commit_user,
             bucket_assigner,
@@ -543,8 +546,12 @@ impl TableWrite {
         for (partition_bytes, bucket, files) in results {
             let key = (partition_bytes.clone(), bucket);
             let index_files = 
index_files_by_key.remove(&key).unwrap_or_default();
-            if !files.is_empty() || !index_files.is_empty() {
-                let mut msg = CommitMessage::new(partition_bytes, bucket, 
files);
+            if !files.data_files.is_empty()
+                || !files.changelog_files.is_empty()
+                || !index_files.is_empty()
+            {
+                let mut msg = CommitMessage::new(partition_bytes, bucket, 
files.data_files);
+                msg.new_changelog_files = files.changelog_files;
                 msg.new_index_files = index_files;
                 messages.push(msg);
             }
@@ -682,6 +689,11 @@ impl TableWrite {
                 file_compression_zstd_level: self.file_compression_zstd_level,
                 write_buffer_size: self.write_buffer_size,
                 file_format: self.file_format.clone(),
+                input_changelog: self.changelog_producer == 
ChangelogProducer::Input
+                    && !self.is_overwrite,
+                changelog_file_prefix: self.changelog_file_prefix.clone(),
+                changelog_file_compression: 
self.changelog_file_compression.clone(),
+                changelog_file_format: self.changelog_file_format.clone(),
                 primary_key_indices: self.primary_key_indices.clone(),
                 primary_key_types: self.primary_key_types.clone(),
                 sequence_field_indices: self.sequence_field_indices.clone(),
@@ -697,15 +709,18 @@ impl TableWrite {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::arrow::format::create_format_reader;
     use crate::catalog::Identifier;
     use crate::io::{FileIO, FileIOBuilder};
     use crate::spec::{
-        BinaryRowBuilder, BlobType, DataType, DecimalType, IntType, 
LocalZonedTimestampType,
-        Schema, TableSchema, TimestampType, VarCharType,
+        bucket_dir_name, BigIntType, BinaryRowBuilder, BlobType, DataField, 
DataType, DecimalType,
+        FileKind, IndexManifest, IntType, LocalZonedTimestampType, Manifest, 
ManifestList, Schema,
+        TableSchema, TimestampType, TinyIntType, VarCharType, 
SEQUENCE_NUMBER_FIELD_ID,
+        SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_ID, VALUE_KIND_FIELD_NAME,
     };
     use crate::table::{SnapshotManager, TableCommit};
-    use arrow_array::Int32Array;
     use arrow_array::RecordBatchReader as _;
+    use arrow_array::{Int32Array, Int64Array, Int8Array};
     use arrow_schema::{
         DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, 
TimeUnit,
     };
@@ -790,6 +805,147 @@ mod tests {
         .unwrap()
     }
 
+    fn make_batch_with_value_kind(
+        ids: Vec<i32>,
+        values: Vec<i32>,
+        value_kinds: Vec<i8>,
+    ) -> RecordBatch {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("id", ArrowDataType::Int32, false),
+            ArrowField::new("value", ArrowDataType::Int32, false),
+            ArrowField::new(
+                crate::spec::VALUE_KIND_FIELD_NAME,
+                ArrowDataType::Int8,
+                false,
+            ),
+        ]));
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(Int32Array::from(values)),
+                Arc::new(Int8Array::from(value_kinds)),
+            ],
+        )
+        .unwrap()
+    }
+
+    fn make_partitioned_batch_with_value_kind(
+        pts: Vec<&str>,
+        ids: Vec<i32>,
+        values: Vec<i32>,
+        value_kinds: Vec<i8>,
+    ) -> RecordBatch {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("pt", ArrowDataType::Utf8, false),
+            ArrowField::new("id", ArrowDataType::Int32, false),
+            ArrowField::new("value", ArrowDataType::Int32, false),
+            ArrowField::new(
+                crate::spec::VALUE_KIND_FIELD_NAME,
+                ArrowDataType::Int8,
+                false,
+            ),
+        ]));
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(arrow_array::StringArray::from(pts)),
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(Int32Array::from(values)),
+                Arc::new(Int8Array::from(value_kinds)),
+            ],
+        )
+        .unwrap()
+    }
+
+    fn physical_key_value_fields() -> Vec<DataField> {
+        vec![
+            DataField::new(
+                SEQUENCE_NUMBER_FIELD_ID,
+                SEQUENCE_NUMBER_FIELD_NAME.to_string(),
+                DataType::BigInt(BigIntType::new()),
+            ),
+            DataField::new(
+                VALUE_KIND_FIELD_ID,
+                VALUE_KIND_FIELD_NAME.to_string(),
+                DataType::TinyInt(TinyIntType::new()),
+            ),
+            DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+            DataField::new(1, "value".to_string(), 
DataType::Int(IntType::new())),
+        ]
+    }
+
+    async fn read_physical_key_value_batches(
+        file_io: &FileIO,
+        file_path: &str,
+        file_size: i64,
+    ) -> Vec<RecordBatch> {
+        let format_reader = create_format_reader(file_path, false).unwrap();
+        let input = file_io.new_input(file_path).unwrap();
+        let file_reader = input.reader().await.unwrap();
+        let read_fields = physical_key_value_fields();
+        let stream = format_reader
+            .read_batch_stream(
+                Box::new(file_reader),
+                file_size as u64,
+                &read_fields,
+                None,
+                None,
+                None,
+            )
+            .await
+            .unwrap();
+        futures::TryStreamExt::try_collect(stream).await.unwrap()
+    }
+
+    fn collect_i32(batches: &[RecordBatch], column: usize) -> Vec<i32> {
+        batches
+            .iter()
+            .flat_map(|batch| {
+                batch
+                    .column(column)
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .unwrap()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect()
+    }
+
+    fn collect_i64(batches: &[RecordBatch], column: usize) -> Vec<i64> {
+        batches
+            .iter()
+            .flat_map(|batch| {
+                batch
+                    .column(column)
+                    .as_any()
+                    .downcast_ref::<Int64Array>()
+                    .unwrap()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect()
+    }
+
+    fn collect_i8(batches: &[RecordBatch], column: usize) -> Vec<i8> {
+        batches
+            .iter()
+            .flat_map(|batch| {
+                batch
+                    .column(column)
+                    .as_any()
+                    .downcast_ref::<Int8Array>()
+                    .unwrap()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect()
+    }
+
     fn make_partitioned_batch(pts: Vec<&str>, ids: Vec<i32>) -> RecordBatch {
         let schema = Arc::new(ArrowSchema::new(vec![
             ArrowField::new("pt", ArrowDataType::Utf8, false),
@@ -1396,6 +1552,320 @@ mod tests {
         )
     }
 
+    fn pk_changelog_schema(options: &[(&str, &str)]) -> TableSchema {
+        let mut builder = Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .primary_key(["id"])
+            .option("bucket", "1");
+        for (key, value) in options {
+            builder = builder.option(*key, *value);
+        }
+        TableSchema::new(0, &builder.build().unwrap())
+    }
+
+    fn ordinary_dynamic_pk_changelog_schema() -> TableSchema {
+        let schema = Schema::builder()
+            .column("pt", DataType::VarChar(VarCharType::string_type()))
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .partition_keys(["pt"])
+            .primary_key(["pt", "id"])
+            .option("changelog-producer", "input")
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    #[tokio::test]
+    async fn test_input_changelog_writes_raw_rows_separately_from_data_rows() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_input_changelog_duplicate_pk";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_input_changelog"),
+            table_path.to_string(),
+            pk_changelog_schema(&[
+                ("changelog-producer", "input"),
+                ("changelog-file.prefix", "custom-changelog-"),
+            ]),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+        table_write
+            .write_arrow_batch(&make_batch(vec![1, 1], vec![10, 20]))
+            .await
+            .unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        assert_eq!(messages[0].new_files.len(), 1);
+        assert_eq!(messages[0].new_files[0].row_count, 1);
+        assert_eq!(messages[0].new_changelog_files.len(), 1);
+        assert_eq!(messages[0].new_changelog_files[0].row_count, 2);
+        assert!(messages[0].new_files[0].file_name.starts_with("data-"));
+        assert!(messages[0].new_changelog_files[0]
+            .file_name
+            .starts_with("custom-changelog-"));
+
+        let bucket_dir = bucket_dir_name(messages[0].bucket);
+        let data_file = &messages[0].new_files[0];
+        let data_file_path = format!("{table_path}/{bucket_dir}/{}", 
data_file.file_name);
+        let data_batches =
+            read_physical_key_value_batches(&file_io, &data_file_path, 
data_file.file_size).await;
+        assert_eq!(collect_i64(&data_batches, 0), vec![1]);
+        assert_eq!(collect_i8(&data_batches, 1), vec![0]);
+        assert_eq!(collect_i32(&data_batches, 2), vec![1]);
+        assert_eq!(collect_i32(&data_batches, 3), vec![20]);
+
+        let changelog_file = &messages[0].new_changelog_files[0];
+        let changelog_file_path = format!("{table_path}/{bucket_dir}/{}", 
changelog_file.file_name);
+        let changelog_batches = read_physical_key_value_batches(
+            &file_io,
+            &changelog_file_path,
+            changelog_file.file_size,
+        )
+        .await;
+        assert_eq!(collect_i64(&changelog_batches, 0), vec![0, 1]);
+        assert_eq!(collect_i8(&changelog_batches, 1), vec![0, 0]);
+        assert_eq!(collect_i32(&changelog_batches, 2), vec![1, 1]);
+        assert_eq!(collect_i32(&changelog_batches, 3), vec![10, 20]);
+    }
+
+    #[tokio::test]
+    async fn test_input_changelog_metadata_counts_retract_rows() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_input_changelog_retract_rows";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "test_input_changelog"),
+            table_path.to_string(),
+            pk_changelog_schema(&[("changelog-producer", "input")]),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+        table_write
+            .write_arrow_batch(&make_batch_with_value_kind(
+                vec![1, 2, 3],
+                vec![10, 20, 30],
+                vec![0, 1, 3],
+            ))
+            .await
+            .unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages[0].new_files[0].delete_row_count, Some(2));
+        assert_eq!(messages[0].new_changelog_files[0].delete_row_count, 
Some(2));
+    }
+
+    #[tokio::test]
+    async fn test_input_changelog_rejects_invalid_value_kind() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_input_changelog_invalid_value_kind";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "test_input_changelog"),
+            table_path.to_string(),
+            pk_changelog_schema(&[("changelog-producer", "input")]),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+        table_write
+            .write_arrow_batch(&make_batch_with_value_kind(vec![1], vec![10], 
vec![4]))
+            .await
+            .unwrap();
+
+        let err = table_write.prepare_commit().await.unwrap_err();
+        assert!(
+            matches!(err, crate::Error::DataInvalid { message, .. } if 
message.contains("Invalid RowKind value"))
+        );
+    }
+
+    #[tokio::test]
+    async fn test_input_changelog_commit_writes_changelog_manifest_metadata() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_input_changelog_commit";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_input_changelog"),
+            table_path.to_string(),
+            pk_changelog_schema(&[("changelog-producer", "input")]),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+        table_write
+            .write_arrow_batch(&make_batch(vec![1, 1], vec![10, 20]))
+            .await
+            .unwrap();
+        let messages = table_write.prepare_commit().await.unwrap();
+        let data_file_name = messages[0].new_files[0].file_name.clone();
+        let changelog_file_name = 
messages[0].new_changelog_files[0].file_name.clone();
+
+        let commit = TableCommit::new(table, "test-user".to_string());
+        commit.commit(messages).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.total_record_count(), Some(1));
+        assert_eq!(snapshot.delta_record_count(), Some(1));
+        assert_eq!(snapshot.changelog_record_count(), Some(2));
+
+        let manifest_dir = format!("{table_path}/manifest");
+        let delta_metas = ManifestList::read(
+            &file_io,
+            &format!("{manifest_dir}/{}", snapshot.delta_manifest_list()),
+        )
+        .await
+        .unwrap();
+        let delta_entries = Manifest::read(
+            &file_io,
+            &format!("{manifest_dir}/{}", delta_metas[0].file_name()),
+        )
+        .await
+        .unwrap();
+        assert_eq!(delta_entries.len(), 1);
+        assert_eq!(*delta_entries[0].kind(), FileKind::Add);
+        assert_eq!(delta_entries[0].file().file_name, data_file_name);
+
+        let changelog_list = snapshot
+            .changelog_manifest_list()
+            .expect("changelog manifest list");
+        let changelog_metas =
+            ManifestList::read(&file_io, 
&format!("{manifest_dir}/{changelog_list}"))
+                .await
+                .unwrap();
+        let changelog_entries = Manifest::read(
+            &file_io,
+            &format!("{manifest_dir}/{}", changelog_metas[0].file_name()),
+        )
+        .await
+        .unwrap();
+        assert_eq!(changelog_entries.len(), 1);
+        assert_eq!(*changelog_entries[0].kind(), FileKind::Add);
+        assert_eq!(changelog_entries[0].file().file_name, changelog_file_name);
+        assert_eq!(changelog_entries[0].file().row_count, 2);
+    }
+
+    #[tokio::test]
+    async fn 
test_input_changelog_dynamic_bucket_commits_data_changelog_and_index() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_input_changelog_dynamic_bucket_commit";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_input_changelog_dynamic_bucket"),
+            table_path.to_string(),
+            ordinary_dynamic_pk_changelog_schema(),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+        assert!(matches!(
+            table_write.bucket_assigner,
+            BucketAssignerEnum::Dynamic(_)
+        ));
+        table_write
+            .write_arrow_batch(&make_partitioned_batch_with_value_kind(
+                vec!["a", "a"],
+                vec![1, 2],
+                vec![10, 20],
+                vec![0, 3],
+            ))
+            .await
+            .unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        assert_eq!(messages[0].new_files.len(), 1);
+        assert_eq!(messages[0].new_files[0].row_count, 2);
+        assert_eq!(messages[0].new_files[0].delete_row_count, Some(1));
+        assert_eq!(messages[0].new_changelog_files.len(), 1);
+        assert_eq!(messages[0].new_changelog_files[0].row_count, 2);
+        assert_eq!(messages[0].new_changelog_files[0].delete_row_count, 
Some(1));
+        assert_eq!(messages[0].new_index_files.len(), 1);
+        assert_eq!(messages[0].new_index_files[0].index_type, "HASH");
+        assert_eq!(messages[0].new_index_files[0].row_count, 2);
+
+        let index_file_name = messages[0].new_index_files[0].file_name.clone();
+
+        let commit = TableCommit::new(table, "test-user".to_string());
+        commit.commit(messages).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.total_record_count(), Some(2));
+        assert_eq!(snapshot.delta_record_count(), Some(2));
+        assert_eq!(snapshot.changelog_record_count(), Some(2));
+
+        let manifest_dir = format!("{table_path}/manifest");
+        let changelog_list = snapshot
+            .changelog_manifest_list()
+            .expect("changelog manifest list");
+        let changelog_metas =
+            ManifestList::read(&file_io, 
&format!("{manifest_dir}/{changelog_list}"))
+                .await
+                .unwrap();
+        let changelog_partition_stats = changelog_metas[0].partition_stats();
+        assert!(!changelog_partition_stats.min_values().is_empty());
+        assert!(!changelog_partition_stats.max_values().is_empty());
+        assert_eq!(
+            changelog_partition_stats.null_counts().as_slice(),
+            &[Some(0)]
+        );
+
+        let index_manifest = snapshot.index_manifest().expect("index 
manifest");
+        let index_entries =
+            IndexManifest::read(&file_io, 
&format!("{manifest_dir}/{index_manifest}"))
+                .await
+                .unwrap();
+        assert_eq!(index_entries.len(), 1);
+        assert_eq!(index_entries[0].kind, FileKind::Add);
+        assert_eq!(index_entries[0].index_file.file_name, index_file_name);
+        assert_eq!(index_entries[0].index_file.index_type, "HASH");
+        assert_eq!(index_entries[0].index_file.row_count, 2);
+    }
+
+    #[tokio::test]
+    async fn test_input_changelog_overwrite_does_not_write_changelog_files() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_input_changelog_overwrite";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "test_input_changelog"),
+            table_path.to_string(),
+            pk_changelog_schema(&[("changelog-producer", "input")]),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, "test-user".to_string())
+            .unwrap()
+            .with_overwrite();
+        table_write
+            .write_arrow_batch(&make_batch(vec![1], vec![10]))
+            .await
+            .unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        assert_eq!(messages[0].new_files.len(), 1);
+        assert!(messages[0].new_changelog_files.is_empty());
+    }
+
     #[tokio::test]
     async fn test_pk_write_and_commit() {
         let file_io = test_file_io();

Reply via email to