This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 748b211  feat: support fixed-bucket partial-update merge engine (#263)
748b211 is described below

commit 748b21139608dd8bc6e6fb480e7c6c82361c4863
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 12 14:59:03 2026 +0800

    feat: support fixed-bucket partial-update merge engine (#263)
---
 crates/integrations/datafusion/tests/pk_tables.rs | 102 +++-
 crates/paimon/src/spec/core_options.rs            |  11 +
 crates/paimon/src/spec/mod.rs                     |   3 +
 crates/paimon/src/spec/partial_update.rs          | 202 ++++++++
 crates/paimon/src/spec/schema.rs                  |  25 +
 crates/paimon/src/table/bucket_assigner_cross.rs  |  21 +-
 crates/paimon/src/table/cow_writer.rs             |   1 +
 crates/paimon/src/table/kv_file_reader.rs         |  35 +-
 crates/paimon/src/table/kv_file_writer.rs         | 275 +++++++++--
 crates/paimon/src/table/read_builder.rs           |  53 +++
 crates/paimon/src/table/sort_merge.rs             | 543 ++++++++++++++++++++--
 crates/paimon/src/table/table_read.rs             |  22 +-
 crates/paimon/src/table/table_scan.rs             |  54 ++-
 crates/paimon/src/table/table_write.rs            | 101 +++-
 14 files changed, 1334 insertions(+), 114 deletions(-)

diff --git a/crates/integrations/datafusion/tests/pk_tables.rs 
b/crates/integrations/datafusion/tests/pk_tables.rs
index fe09f98..3cc0c8b 100644
--- a/crates/integrations/datafusion/tests/pk_tables.rs
+++ b/crates/integrations/datafusion/tests/pk_tables.rs
@@ -31,7 +31,7 @@ use common::{
     collect_id_name, collect_id_value, create_sql_context, create_test_env, 
row_count,
     setup_sql_context,
 };
-use datafusion::arrow::array::{Int32Array, StringArray};
+use datafusion::arrow::array::{Array, Int32Array, StringArray};
 use paimon::catalog::Identifier;
 use paimon::Catalog;
 
@@ -76,6 +76,106 @@ async fn test_pk_basic_write_read() {
     );
 }
 
+/// Partial-update merge engine: keep latest non-null value for each field.
+#[tokio::test]
+async fn test_pk_partial_update_fixed_bucket_e2e() {
+    let (_tmp, handler) = setup_handler().await;
+
+    handler
+        .sql(
+            "CREATE TABLE paimon.test_db.t_partial_update (
+                id INT NOT NULL, v_int INT, v_str STRING,
+                PRIMARY KEY (id)
+            ) WITH ('bucket' = '1', 'merge-engine' = 'partial-update')",
+        )
+        .await
+        .unwrap();
+
+    handler
+        .sql(
+            "INSERT INTO paimon.test_db.t_partial_update VALUES
+             (1, 10, 'old-1'),
+             (2, 20, 'old-2')",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    handler
+        .sql(
+            "INSERT INTO paimon.test_db.t_partial_update VALUES
+             (1, CAST(NULL AS INT), 'new-1'),
+             (2, 200, CAST(NULL AS STRING)),
+             (3, 30, CAST(NULL AS STRING))",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    handler
+        .sql(
+            "INSERT INTO paimon.test_db.t_partial_update VALUES
+             (1, 111, CAST(NULL AS STRING))",
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let batches = handler
+        .sql("SELECT id, v_int, v_str FROM paimon.test_db.t_partial_update 
ORDER BY id")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let mut rows = Vec::new();
+    for batch in &batches {
+        let ids = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .unwrap();
+        let ints = batch
+            .column_by_name("v_int")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .unwrap();
+        let strs = batch
+            .column_by_name("v_str")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .unwrap();
+        for i in 0..batch.num_rows() {
+            rows.push((
+                ids.value(i),
+                if ints.is_null(i) {
+                    None
+                } else {
+                    Some(ints.value(i))
+                },
+                if strs.is_null(i) {
+                    None
+                } else {
+                    Some(strs.value(i).to_string())
+                },
+            ));
+        }
+    }
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, Some(111), Some("new-1".to_string())),
+            (2, Some(200), Some("old-2".to_string())),
+            (3, Some(30), None),
+        ]
+    );
+}
+
 // ======================= Dedup Within Single Commit =======================
 
 /// Duplicate keys in a single INSERT — last value wins (Deduplicate engine).
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index 7cdbbc4..bafad0a 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -69,6 +69,8 @@ const BLOB_DESCRIPTOR_FIELD_OPTION: &str = 
"blob-descriptor-field";
 pub enum MergeEngine {
     /// Keep the row with the highest sequence number (default).
     Deduplicate,
+    /// Merge same-key rows field-by-field, usually keeping non-null updates.
+    PartialUpdate,
     /// Keep the first row for each key (ignore later updates).
     FirstRow,
 }
@@ -127,6 +129,7 @@ impl<'a> CoreOptions<'a> {
             None => Ok(MergeEngine::Deduplicate),
             Some(v) => match v.to_ascii_lowercase().as_str() {
                 "deduplicate" => Ok(MergeEngine::Deduplicate),
+                "partial-update" => Ok(MergeEngine::PartialUpdate),
                 "first-row" => Ok(MergeEngine::FirstRow),
                 other => Err(crate::Error::Unsupported {
                     message: format!("Unsupported merge-engine: '{other}'"),
@@ -535,6 +538,14 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_merge_engine_accepts_partial_update() {
+        let options = HashMap::from([(MERGE_ENGINE_OPTION.to_string(), 
"partial-update".into())]);
+        let core = CoreOptions::new(&options);
+
+        assert_eq!(core.merge_engine().unwrap(), MergeEngine::PartialUpdate);
+    }
+
     #[test]
     fn test_commit_options_defaults() {
         let options = HashMap::new();
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 6ad6533..270b27f 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -32,6 +32,9 @@ mod core_options;
 pub(crate) use core_options::TimeTravelSelector;
 pub use core_options::*;
 
+mod partial_update;
+pub(crate) use partial_update::PartialUpdateConfig;
+
 mod schema;
 pub use schema::*;
 
diff --git a/crates/paimon/src/spec/partial_update.rs 
b/crates/paimon/src/spec/partial_update.rs
new file mode 100644
index 0000000..b7ae1b6
--- /dev/null
+++ b/crates/paimon/src/spec/partial_update.rs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+const MERGE_ENGINE_OPTION: &str = "merge-engine";
+const PARTIAL_UPDATE_ENGINE: &str = "partial-update";
+const IGNORE_DELETE_OPTION: &str = "ignore-delete";
+const IGNORE_DELETE_SUFFIX: &str = ".ignore-delete";
+const PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION: &str =
+    "partial-update.remove-record-on-delete";
+const PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION: &str =
+    "partial-update.remove-record-on-sequence-group";
+const FIELDS_DEFAULT_AGG_FUNCTION_OPTION: &str = 
"fields.default-aggregate-function";
+const FIELDS_PREFIX: &str = "fields.";
+const SEQUENCE_GROUP_SUFFIX: &str = ".sequence-group";
+const AGGREGATION_FUNCTION_SUFFIX: &str = ".aggregate-function";
+
+/// Minimal partial-update mode recognized by the current Rust implementation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum PartialUpdateMode {
+    Basic,
+}
+
+/// Partial-update-specific option inspection and validation.
+///
+/// PR1 only recognizes the basic mode: `merge-engine=partial-update` on a PK
+/// table without delete, sequence-group, or aggregation controls.
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct PartialUpdateConfig<'a> {
+    options: &'a HashMap<String, String>,
+}
+
+impl<'a> PartialUpdateConfig<'a> {
+    pub(crate) fn new(options: &'a HashMap<String, String>) -> Self {
+        Self { options }
+    }
+
+    pub(crate) fn is_enabled(&self) -> bool {
+        self.options
+            .get(MERGE_ENGINE_OPTION)
+            .is_some_and(|value| 
value.eq_ignore_ascii_case(PARTIAL_UPDATE_ENGINE))
+    }
+
+    pub(crate) fn validate_create_mode(
+        &self,
+        has_primary_keys: bool,
+    ) -> crate::Result<Option<PartialUpdateMode>> {
+        match self.validated_mode(has_primary_keys) {
+            Ok(mode) => Ok(mode),
+            Err(unsupported_options) => Err(crate::Error::ConfigInvalid {
+                message: format!(
+                    "merge-engine=partial-update only supports the basic mode 
in this build; unsupported options: {}",
+                    unsupported_options.join(", ")
+                ),
+            }),
+        }
+    }
+
+    pub(crate) fn validate_runtime_mode(
+        &self,
+        has_primary_keys: bool,
+        table_name: &str,
+    ) -> crate::Result<Option<PartialUpdateMode>> {
+        match self.validated_mode(has_primary_keys) {
+            Ok(mode) => Ok(mode),
+            Err(unsupported_options) => Err(crate::Error::Unsupported {
+                message: format!(
+                    "Table '{table_name}' uses merge-engine=partial-update 
options not supported by this build: {}",
+                    unsupported_options.join(", ")
+                ),
+            }),
+        }
+    }
+
+    fn validated_mode(
+        &self,
+        has_primary_keys: bool,
+    ) -> std::result::Result<Option<PartialUpdateMode>, Vec<String>> {
+        if !has_primary_keys || !self.is_enabled() {
+            return Ok(None);
+        }
+
+        let unsupported_options = self.unsupported_option_keys();
+        if !unsupported_options.is_empty() {
+            return Err(unsupported_options);
+        }
+
+        Ok(Some(PartialUpdateMode::Basic))
+    }
+
+    fn unsupported_option_keys(&self) -> Vec<String> {
+        let mut keys: Vec<String> = self
+            .options
+            .keys()
+            .filter(|key| is_unsupported_partial_update_option(key))
+            .cloned()
+            .collect();
+        keys.sort();
+        keys
+    }
+}
+
+fn is_unsupported_partial_update_option(key: &str) -> bool {
+    key == IGNORE_DELETE_OPTION
+        || key.ends_with(IGNORE_DELETE_SUFFIX)
+        || key == PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION
+        || key == PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION
+        || key == FIELDS_DEFAULT_AGG_FUNCTION_OPTION
+        || is_fields_option_with_suffix(key, SEQUENCE_GROUP_SUFFIX)
+        || is_fields_option_with_suffix(key, AGGREGATION_FUNCTION_SUFFIX)
+}
+
+fn is_fields_option_with_suffix(key: &str, suffix: &str) -> bool {
+    key.starts_with(FIELDS_PREFIX) && key.ends_with(suffix)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn partial_update_options(extra: &[(&str, &str)]) -> HashMap<String, 
String> {
+        let mut options = HashMap::from([(
+            MERGE_ENGINE_OPTION.to_string(),
+            PARTIAL_UPDATE_ENGINE.to_string(),
+        )]);
+        options.extend(
+            extra
+                .iter()
+                .map(|(key, value)| ((*key).to_string(), 
(*value).to_string())),
+        );
+        options
+    }
+
+    #[test]
+    fn test_validate_create_mode_accepts_basic_pk_partial_update() {
+        let options = partial_update_options(&[]);
+        let config = PartialUpdateConfig::new(&options);
+
+        assert_eq!(
+            config.validate_create_mode(true).unwrap(),
+            Some(PartialUpdateMode::Basic)
+        );
+    }
+
+    #[test]
+    fn test_validate_create_mode_ignores_non_pk_tables() {
+        let options = partial_update_options(&[(IGNORE_DELETE_OPTION, 
"true")]);
+        let config = PartialUpdateConfig::new(&options);
+
+        assert_eq!(config.validate_create_mode(false).unwrap(), None);
+    }
+
+    #[test]
+    fn test_validate_create_mode_rejects_unsupported_partial_update_options() {
+        for key in [
+            IGNORE_DELETE_OPTION,
+            "partial-update.ignore-delete",
+            PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION,
+            PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION,
+            "fields.price.sequence-group",
+            "fields.price.aggregate-function",
+            FIELDS_DEFAULT_AGG_FUNCTION_OPTION,
+        ] {
+            let options = partial_update_options(&[(key, "value")]);
+            let config = PartialUpdateConfig::new(&options);
+            let err = config.validate_create_mode(true).unwrap_err();
+
+            assert!(
+                matches!(err, crate::Error::ConfigInvalid { ref message } if 
message.contains(key)),
+                "expected create-time rejection to mention '{key}', got 
{err:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_validate_runtime_mode_rejects_unsupported_partial_update_options() 
{
+        let options =
+            partial_update_options(&[("fields.price.aggregate-function", 
"last_non_null")]);
+        let config = PartialUpdateConfig::new(&options);
+        let err = config.validate_runtime_mode(true, "default.t").unwrap_err();
+
+        assert!(
+            matches!(err, crate::Error::Unsupported { ref message } if 
message.contains("fields.price.aggregate-function")),
+            "expected runtime rejection to mention the unsupported option, got 
{err:?}"
+        );
+    }
+}
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index e09027b..ddada6a 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -17,6 +17,7 @@
 
 use crate::spec::core_options::CoreOptions;
 use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
+use crate::spec::PartialUpdateConfig;
 use serde::{Deserialize, Serialize};
 use serde_with::serde_as;
 use std::collections::{HashMap, HashSet};
@@ -289,6 +290,7 @@ impl Schema {
         let partition_keys = Self::normalize_partition_keys(&partition_keys, 
&mut options)?;
         let fields = Self::normalize_fields(&fields, &partition_keys, 
&primary_keys)?;
         Self::validate_blob_fields(&fields, &partition_keys, &options)?;
+        
PartialUpdateConfig::new(&options).validate_create_mode(!primary_keys.is_empty())?;
 
         Ok(Self {
             fields,
@@ -917,6 +919,29 @@ mod tests {
         assert_eq!(schema.fields().len(), 2);
     }
 
+    #[test]
+    fn test_partial_update_schema_validation_rejects_unsupported_options() {
+        for (key, value) in [
+            ("ignore-delete", "true"),
+            ("fields.value.sequence-group", "g1"),
+            ("fields.default-aggregate-function", "last_non_null"),
+        ] {
+            let err = Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .primary_key(["id"])
+                .option("merge-engine", "partial-update")
+                .option(key, value)
+                .build()
+                .unwrap_err();
+
+            assert!(
+                matches!(err, crate::Error::ConfigInvalid { ref message } if 
message.contains(key)),
+                "partial-update create-time validation should reject '{key}', 
got {err:?}"
+            );
+        }
+    }
+
     #[test]
     fn test_schema_builder_column_row_type() {
         let row_type = RowType::new(vec![DataField::new(
diff --git a/crates/paimon/src/table/bucket_assigner_cross.rs 
b/crates/paimon/src/table/bucket_assigner_cross.rs
index e60c9eb..a021761 100644
--- a/crates/paimon/src/table/bucket_assigner_cross.rs
+++ b/crates/paimon/src/table/bucket_assigner_cross.rs
@@ -147,19 +147,19 @@ impl GlobalPartitionIndex {
     }
 
     /// Assign a bucket for the given primary key targeting `new_partition`.
-    fn assign(&mut self, pk_bytes: &[u8], new_partition: &[u8]) -> 
AssignResult {
+    fn assign(&mut self, pk_bytes: &[u8], new_partition: &[u8]) -> 
Result<AssignResult> {
         if let Some((existing_partition, existing_bucket)) = 
self.key_to_location.get(pk_bytes) {
             if existing_partition == new_partition {
-                return AssignResult::SamePartition {
+                return Ok(AssignResult::SamePartition {
                     bucket: *existing_bucket,
-                };
+                });
             }
 
             // Key exists in a different partition
             match self.merge_engine {
                 MergeEngine::FirstRow => {
                     // FIRST_ROW: keep old data, discard new row
-                    return AssignResult::Skip;
+                    return Ok(AssignResult::Skip);
                 }
                 MergeEngine::Deduplicate => {
                     let old_partition = existing_partition.clone();
@@ -181,11 +181,16 @@ impl GlobalPartitionIndex {
                     self.key_to_location
                         .insert(pk_bytes.to_vec(), (new_partition.to_vec(), 
new_bucket));
 
-                    return AssignResult::CrossPartition {
+                    return Ok(AssignResult::CrossPartition {
                         old_partition,
                         old_bucket,
                         new_bucket,
-                    };
+                    });
+                }
+                MergeEngine::PartialUpdate => {
+                    return Err(crate::Error::Unsupported {
+                        message: "CrossPartitionAssigner does not support 
merge-engine=partial-update yet".to_string(),
+                    });
                 }
             }
         }
@@ -193,7 +198,7 @@ impl GlobalPartitionIndex {
         let bucket = self.assign_bucket_in_partition(new_partition);
         self.key_to_location
             .insert(pk_bytes.to_vec(), (new_partition.to_vec(), bucket));
-        AssignResult::SamePartition { bucket }
+        Ok(AssignResult::SamePartition { bucket })
     }
 
     fn assign_bucket_in_partition(&mut self, partition: &[u8]) -> i32 {
@@ -309,7 +314,7 @@ impl BucketAssigner for CrossPartitionAssigner {
         let mut skips = Vec::new();
 
         for row_idx in 0..num_rows {
-            match global_index.assign(&pk_bytes_vec[row_idx], 
&partition_bytes_vec[row_idx]) {
+            match global_index.assign(&pk_bytes_vec[row_idx], 
&partition_bytes_vec[row_idx])? {
                 AssignResult::SamePartition { bucket } => {
                     buckets.push(bucket);
                 }
diff --git a/crates/paimon/src/table/cow_writer.rs 
b/crates/paimon/src/table/cow_writer.rs
index c2b5e95..7006f0f 100644
--- a/crates/paimon/src/table/cow_writer.rs
+++ b/crates/paimon/src/table/cow_writer.rs
@@ -223,6 +223,7 @@ impl CopyOnWriteMergeWriter {
         let target_file_size = core_options.target_file_size();
         let file_compression = core_options.file_compression().to_string();
         let file_compression_zstd_level = 
core_options.file_compression_zstd_level();
+        let file_format = core_options.file_format().to_string();
         let write_buffer_size = core_options.write_parquet_buffer_size();
         let file_format = core_options.file_format().to_string();
         let schema_id = schema.id();
diff --git a/crates/paimon/src/table/kv_file_reader.rs 
b/crates/paimon/src/table/kv_file_reader.rs
index 1ad0f08..128b2da 100644
--- a/crates/paimon/src/table/kv_file_reader.rs
+++ b/crates/paimon/src/table/kv_file_reader.rs
@@ -24,11 +24,13 @@
 //! Reference: Java Paimon `SortMergeReaderWithMinHeap`.
 
 use super::data_file_reader::DataFileReader;
-use super::sort_merge::{DeduplicateMergeFunction, SortMergeReaderBuilder};
+use super::sort_merge::{
+    DeduplicateMergeFunction, PartialUpdateMergeFunction, 
SortMergeReaderBuilder,
+};
 use crate::arrow::build_target_arrow_schema;
 use crate::io::FileIO;
 use crate::spec::{
-    BigIntType, DataField, DataType as PaimonDataType, Predicate, TinyIntType,
+    BigIntType, DataField, DataType as PaimonDataType, MergeEngine, Predicate, 
TinyIntType,
     SEQUENCE_NUMBER_FIELD_ID, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_ID,
     VALUE_KIND_FIELD_NAME,
 };
@@ -39,6 +41,7 @@ use arrow_array::RecordBatch;
 
 use async_stream::try_stream;
 use futures::StreamExt;
+use std::collections::HashMap;
 
 /// Reads primary-key table data files using sort-merge deduplication.
 pub(crate) struct KeyValueFileReader {
@@ -49,12 +52,15 @@ pub(crate) struct KeyValueFileReader {
 /// Configuration for [`KeyValueFileReader`], grouping table schema and
 /// key/predicate parameters.
 pub(crate) struct KeyValueReadConfig {
+    pub table_name: String,
+    pub table_options: HashMap<String, String>,
     pub schema_manager: SchemaManager,
     pub table_schema_id: i64,
     pub table_fields: Vec<DataField>,
     pub read_type: Vec<DataField>,
     pub predicates: Vec<Predicate>,
     pub primary_keys: Vec<String>,
+    pub merge_engine: MergeEngine,
     pub sequence_fields: Vec<String>,
 }
 
@@ -92,6 +98,23 @@ impl KeyValueFileReader {
         }
     }
 
+    fn new_merge_function(
+        merge_engine: MergeEngine,
+        table_options: &HashMap<String, String>,
+        table_name: &str,
+    ) -> crate::Result<Box<dyn super::sort_merge::MergeFunction>> {
+        match merge_engine {
+            MergeEngine::Deduplicate => Ok(Box::new(DeduplicateMergeFunction)),
+            MergeEngine::PartialUpdate => 
Ok(Box::new(PartialUpdateMergeFunction::new(
+                table_options,
+                table_name,
+            )?)),
+            MergeEngine::FirstRow => Err(Error::Unsupported {
+                message: "KeyValueFileReader does not support 
merge-engine=first-row; first-row reads should use the non-KV path".to_string(),
+            }),
+        }
+    }
+
     pub fn read(self, data_splits: &[DataSplit]) -> 
crate::Result<ArrowRecordBatchStream> {
         // Build the internal read type for thin-mode files.
         // Physical file schema: [_SEQUENCE_NUMBER, _VALUE_KIND, 
all_user_cols...]
@@ -234,9 +257,12 @@ impl KeyValueFileReader {
 
         let splits: Vec<DataSplit> = data_splits.to_vec();
         let file_io = self.file_io;
+        let merge_engine = self.config.merge_engine;
         let schema_manager = self.config.schema_manager;
         let table_schema_id = self.config.table_schema_id;
         let table_fields = self.config.table_fields;
+        let table_name = self.config.table_name;
+        let table_options = self.config.table_options;
         let predicates = self.config.predicates;
 
         // Build the merge output schema (keys + values, no system columns).
@@ -252,9 +278,8 @@ impl KeyValueFileReader {
                     .data_deletion_files()
                     .is_some_and(|files| files.iter().any(Option::is_some))
                 {
-                    Err(Error::UnexpectedError {
+                    Err(Error::Unsupported {
                         message: "KeyValueFileReader does not support deletion 
vectors".to_string(),
-                        source: None,
                     })?;
                 }
 
@@ -303,7 +328,7 @@ impl KeyValueFileReader {
                     user_sequence_indices.clone(),
                     value_indices.clone(),
                     merge_output_schema.clone(),
-                    Box::new(DeduplicateMergeFunction),
+                    Self::new_merge_function(merge_engine, &table_options, 
&table_name)?,
                 )
                 .build()?;
 
diff --git a/crates/paimon/src/table/kv_file_writer.rs 
b/crates/paimon/src/table/kv_file_writer.rs
index 2004d6f..6b43445 100644
--- a/crates/paimon/src/table/kv_file_writer.rs
+++ b/crates/paimon/src/table/kv_file_writer.rs
@@ -31,7 +31,7 @@ use crate::io::FileIO;
 use crate::spec::stats::{compute_column_stats, BinaryTableStats};
 use crate::spec::{
     extract_datum_from_arrow, BinaryRowBuilder, DataFileMeta, DataType, 
MergeEngine,
-    EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_NAME,
+    PartialUpdateConfig, EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME, 
VALUE_KIND_FIELD_NAME,
 };
 use crate::Result;
 use arrow_array::{Int64Array, Int8Array, RecordBatch};
@@ -39,6 +39,7 @@ use arrow_ord::sort::{lexsort_to_indices, SortColumn, 
SortOptions};
 use arrow_row::{RowConverter, SortField};
 use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as 
ArrowSchema};
 use chrono::Utc;
+use std::collections::HashMap;
 use std::sync::Arc;
 
 /// Internal writer for primary-key tables that buffers data in memory,
@@ -59,6 +60,8 @@ pub(crate) struct KeyValueFileWriter {
 /// Configuration for [`KeyValueFileWriter`], grouping file-location, schema,
 /// and key/merge parameters.
 pub(crate) struct KeyValueWriteConfig {
+    pub table_name: String,
+    pub table_options: HashMap<String, String>,
     pub table_location: String,
     pub partition_path: String,
     pub bucket: i32,
@@ -75,6 +78,8 @@ pub(crate) struct KeyValueWriteConfig {
     pub sequence_field_indices: Vec<usize>,
     /// Merge engine for deduplication.
     pub merge_engine: MergeEngine,
+    pub dynamic_bucket_enabled: bool,
+    pub deletion_vectors_enabled: bool,
 }
 
 impl KeyValueFileWriter {
@@ -82,15 +87,38 @@ impl KeyValueFileWriter {
         file_io: FileIO,
         config: KeyValueWriteConfig,
         next_sequence_number: i64,
-    ) -> Self {
-        Self {
+    ) -> Result<Self> {
+        if config.merge_engine == MergeEngine::PartialUpdate {
+            PartialUpdateConfig::new(&config.table_options)
+                .validate_runtime_mode(true, &config.table_name)?;
+
+            if config.deletion_vectors_enabled {
+                return Err(crate::Error::Unsupported {
+                    message: format!(
+                        "Table '{}' uses merge-engine=partial-update with 
deletion-vectors.enabled=true, which is not supported yet",
+                        config.table_name
+                    ),
+                });
+            }
+
+            if config.dynamic_bucket_enabled {
+                return Err(crate::Error::Unsupported {
+                    message: format!(
+                        "Table '{}' uses merge-engine=partial-update with 
bucket=-1, which is not supported yet; currently only fixed-bucket 
partial-update is supported",
+                        config.table_name
+                    ),
+                });
+            }
+        }
+
+        Ok(Self {
             file_io,
             config,
             next_sequence_number,
             buffer: Vec::new(),
             buffer_bytes: 0,
             written_files: Vec::new(),
-        }
+        })
     }
 
     /// Buffer a RecordBatch. Flushes when buffer exceeds write_buffer_size.
@@ -185,16 +213,16 @@ impl KeyValueFileWriter {
                 source: None,
             })?;
 
-        // Deduplicate: for consecutive rows with the same PK, pick the winner.
         // After sorting by PK + seq fields + auto-seq (all ascending):
-        //   Deduplicate → keep last row per key group (highest seq)
-        //   FirstRow    → keep first row per key group (lowest seq)
-        let deduped_indices = self.dedup_sorted_indices(&combined, 
&sorted_indices)?;
-        let deduped_num_rows = deduped_indices.len();
-
-        // Extract min_key / max_key from deduped endpoints.
-        let first_row = deduped_indices[0] as usize;
-        let last_row = deduped_indices[deduped_num_rows - 1] as usize;
+        //   Deduplicate   → keep last row per key group (highest seq)
+        //   FirstRow      → keep first row per key group (lowest seq)
+        //   PartialUpdate → keep all rows for read-side field-wise merge
+        let selected_indices = self.select_flush_indices(&combined, 
&sorted_indices)?;
+        let selected_num_rows = selected_indices.len();
+
+        // Extract min_key / max_key from selected endpoints.
+        let first_row = selected_indices[0] as usize;
+        let last_row = selected_indices[selected_num_rows - 1] as usize;
         let min_key = self.extract_key_binary_row(&combined, first_row)?;
         let max_key = self.extract_key_binary_row(&combined, last_row)?;
 
@@ -231,11 +259,11 @@ impl KeyValueFileWriter {
         )
         .await?;
 
-        // Chunked write using deduped indices.
-        let deduped_u32 = arrow_array::UInt32Array::from(deduped_indices);
-        for chunk_start in 
(0..deduped_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) {
-            let chunk_len = Self::FLUSH_CHUNK_ROWS.min(deduped_num_rows - 
chunk_start);
-            let chunk_indices = deduped_u32.slice(chunk_start, chunk_len);
+        // Chunked write using selected indices.
+        let selected_u32 = arrow_array::UInt32Array::from(selected_indices);
+        for chunk_start in 
(0..selected_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) {
+            let chunk_len = Self::FLUSH_CHUNK_ROWS.min(selected_num_rows - 
chunk_start);
+            let chunk_indices = selected_u32.slice(chunk_start, chunk_len);
 
             let mut physical_columns: Vec<Arc<dyn arrow_array::Array>> = 
Vec::new();
             // Sequence numbers for this chunk.
@@ -296,20 +324,20 @@ impl KeyValueFileWriter {
 
         let file_size = writer.close().await? as i64;
 
-        // Compute key_stats on deduped data (not the raw combined batch).
-        let deduped_key_columns: Vec<Arc<dyn arrow_array::Array>> =
-            self.config
-                .primary_key_indices
-                .iter()
-                .map(|&idx| {
-                    arrow_select::take::take(combined.column(idx).as_ref(), 
&deduped_u32, None)
-                        .map_err(|e| crate::Error::DataInvalid {
-                            message: format!("Failed to take key column for 
stats: {e}"),
-                            source: None,
-                        })
-                })
-                .collect::<Result<Vec<_>>>()?;
-        let deduped_key_batch = RecordBatch::try_new(
+        // Compute key_stats on selected output rows (not the raw combined 
batch).
+        let selected_key_columns: Vec<Arc<dyn arrow_array::Array>> = self
+            .config
+            .primary_key_indices
+            .iter()
+            .map(|&idx| {
+                arrow_select::take::take(combined.column(idx).as_ref(), 
&selected_u32, None)
+                    .map_err(|e| crate::Error::DataInvalid {
+                        message: format!("Failed to take key column for stats: 
{e}"),
+                        source: None,
+                    })
+            })
+            .collect::<Result<Vec<_>>>()?;
+        let selected_key_batch = RecordBatch::try_new(
             Arc::new(ArrowSchema::new(
                 self.config
                     .primary_key_indices
@@ -317,15 +345,15 @@ impl KeyValueFileWriter {
                     .map(|&idx| user_schema.field(idx).clone())
                     .collect::<Vec<_>>(),
             )),
-            deduped_key_columns,
+            selected_key_columns,
         )
         .map_err(|e| crate::Error::DataInvalid {
-            message: format!("Failed to build deduped key batch for stats: 
{e}"),
+            message: format!("Failed to build selected key batch for stats: 
{e}"),
             source: None,
         })?;
         let stats_col_indices: Vec<usize> = 
(0..self.config.primary_key_indices.len()).collect();
         let key_stats = compute_column_stats(
-            &deduped_key_batch,
+            &selected_key_batch,
             &stats_col_indices,
             &self.config.primary_key_types,
         )?;
@@ -334,7 +362,7 @@ impl KeyValueFileWriter {
         let meta = DataFileMeta {
             file_name,
             file_size,
-            row_count: deduped_num_rows as i64,
+            row_count: selected_num_rows as i64,
             min_key,
             max_key,
             key_stats,
@@ -361,7 +389,26 @@ impl KeyValueFileWriter {
         Ok(())
     }
 
-    /// Deduplicate sorted indices by primary key using the configured merge 
engine.
+    /// Select output row indices from sorted inputs according to merge engine.
+    ///
+    /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all 
ascending).
+    /// Output: row indices to write in sorted PK order.
+    fn select_flush_indices(
+        &self,
+        batch: &RecordBatch,
+        sorted_indices: &arrow_array::UInt32Array,
+    ) -> Result<Vec<u32>> {
+        match self.config.merge_engine {
+            MergeEngine::Deduplicate | MergeEngine::FirstRow => {
+                self.dedup_sorted_indices(batch, sorted_indices)
+            }
+            MergeEngine::PartialUpdate => Ok((0..sorted_indices.len())
+                .map(|idx| sorted_indices.value(idx))
+                .collect()),
+        }
+    }
+
+    /// Deduplicate sorted indices by primary key for Deduplicate / FirstRow 
engines.
     ///
     /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all 
ascending).
     /// Output: a Vec<u32> of original row indices to keep, in sorted PK order.
@@ -415,6 +462,9 @@ impl KeyValueFileWriter {
                     MergeEngine::Deduplicate => group_winner = cur,
                     // FirstRow: keep first (lowest seq), so don't update.
                     MergeEngine::FirstRow => {}
+                    MergeEngine::PartialUpdate => unreachable!(
+                        "partial-update should use select_flush_indices and 
skip dedup"
+                    ),
                 }
             } else {
                 // New key group — emit the winner of the previous group.
@@ -473,3 +523,154 @@ pub(crate) fn build_physical_schema(user_schema: 
&ArrowSchema) -> Arc<ArrowSchem
     }
     Arc::new(ArrowSchema::new(physical_fields))
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::spec::IntType;
+    use arrow_array::{Int32Array, UInt32Array};
+    use std::collections::HashMap;
+
+    fn test_write_config(merge_engine: MergeEngine) -> KeyValueWriteConfig {
+        let mut table_options = HashMap::new();
+        if merge_engine == MergeEngine::PartialUpdate {
+            table_options.insert("merge-engine".to_string(), 
"partial-update".to_string());
+        }
+
+        KeyValueWriteConfig {
+            table_name: "default.test_table".to_string(),
+            table_options,
+            table_location: "memory:/kv-test".to_string(),
+            partition_path: String::new(),
+            bucket: 0,
+            schema_id: 0,
+            file_compression: "none".to_string(),
+            file_compression_zstd_level: 0,
+            write_buffer_size: 1024,
+            file_format: "parquet".to_string(),
+            primary_key_indices: vec![0],
+            primary_key_types: vec![DataType::Int(IntType::new())],
+            sequence_field_indices: vec![1],
+            merge_engine,
+            dynamic_bucket_enabled: false,
+            deletion_vectors_enabled: false,
+        }
+    }
+
+    fn first_row_writer() -> KeyValueFileWriter {
+        KeyValueFileWriter::new(
+            FileIOBuilder::new("memory").build().unwrap(),
+            test_write_config(MergeEngine::FirstRow),
+            0,
+        )
+        .unwrap()
+    }
+
+    #[test]
+    fn test_dedup_sorted_indices_keeps_first_row_for_first_row_engine() {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)),
+            Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)),
+            Arc::new(ArrowField::new("value", ArrowDataType::Int32, false)),
+        ]));
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as Arc<dyn 
arrow_array::Array>,
+                Arc::new(Int64Array::from(vec![10, 20, 5, 6])) as Arc<dyn 
arrow_array::Array>,
+                Arc::new(Int32Array::from(vec![100, 200, 300, 400])) as 
Arc<dyn arrow_array::Array>,
+            ],
+        )
+        .unwrap();
+        let sorted_indices = UInt32Array::from(vec![0, 1, 2, 3]);
+
+        let deduped = first_row_writer()
+            .dedup_sorted_indices(&batch, &sorted_indices)
+            .unwrap();
+
+        assert_eq!(deduped, vec![0, 2]);
+    }
+
+    #[test]
+    fn test_select_flush_indices_keeps_all_rows_for_partial_update_engine() {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)),
+            Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)),
+        ]));
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int32Array::from(vec![1, 1])) as Arc<dyn 
arrow_array::Array>,
+                Arc::new(Int64Array::from(vec![10, 20])) as Arc<dyn 
arrow_array::Array>,
+            ],
+        )
+        .unwrap();
+        let sorted_indices = UInt32Array::from(vec![0, 1]);
+        let writer = KeyValueFileWriter::new(
+            FileIOBuilder::new("memory").build().unwrap(),
+            test_write_config(MergeEngine::PartialUpdate),
+            0,
+        )
+        .unwrap();
+
+        let selected = writer
+            .select_flush_indices(&batch, &sorted_indices)
+            .unwrap();
+
+        assert_eq!(selected, vec![0, 1]);
+    }
+
+    #[test]
+    fn test_new_rejects_partial_update_dynamic_bucket() {
+        let mut config = test_write_config(MergeEngine::PartialUpdate);
+        config.dynamic_bucket_enabled = true;
+
+        let err = 
KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config, 
0)
+            .err()
+            .unwrap();
+
+        assert!(matches!(
+            err,
+            crate::Error::Unsupported { message } if 
message.contains("bucket=-1")
+        ));
+    }
+
+    #[test]
+    fn test_new_rejects_partial_update_with_deletion_vectors() {
+        let mut config = test_write_config(MergeEngine::PartialUpdate);
+        config.deletion_vectors_enabled = true;
+
+        let err = 
KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config, 
0)
+            .err()
+            .unwrap();
+
+        assert!(matches!(
+            err,
+            crate::Error::Unsupported { message }
+            if message.contains("deletion-vectors.enabled=true")
+        ));
+    }
+
+    #[test]
+    fn test_new_rejects_unsupported_partial_update_options() {
+        let mut config = test_write_config(MergeEngine::PartialUpdate);
+        config.table_options = HashMap::from([
+            ("merge-engine".to_string(), "partial-update".to_string()),
+            (
+                "fields.price.aggregate-function".to_string(),
+                "last_non_null".to_string(),
+            ),
+        ]);
+
+        let err = 
KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config, 
0)
+            .err()
+            .unwrap();
+
+        assert!(matches!(
+            err,
+            crate::Error::Unsupported { message }
+            if message.contains("fields.price.aggregate-function")
+        ));
+    }
+}
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index b53e767..3e7684d 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -342,6 +342,28 @@ mod tests {
         )
     }
 
+    fn partial_update_dv_pk_table() -> Table {
+        let file_io = FileIOBuilder::new("file").build().unwrap();
+        let table_schema = TableSchema::new(
+            0,
+            &Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .primary_key(["id"])
+                .option("merge-engine", "partial-update")
+                .option("deletion-vectors.enabled", "true")
+                .build()
+                .unwrap(),
+        );
+        Table::new(
+            file_io,
+            Identifier::new("default", "partial_update_dv_t"),
+            "/tmp/test-partial-update-dv-read-builder".to_string(),
+            table_schema,
+            None,
+        )
+    }
+
     #[test]
     fn test_exact_filter_pushdown_is_true_for_partition_only_filter() {
         let table = simple_table();
@@ -699,4 +721,35 @@ mod tests {
         assert_eq!(ranges[0].from(), 0);
         assert_eq!(ranges[0].to(), 5);
     }
+
+    #[tokio::test]
+    async fn 
test_direct_table_read_rejects_partial_update_with_deletion_vectors() {
+        let table = partial_update_dv_pk_table();
+        let split = DataSplitBuilder::new()
+            .with_snapshot(1)
+            .with_partition(BinaryRow::new(0))
+            .with_bucket(0)
+            
.with_bucket_path("/tmp/test-partial-update-dv-read-builder/bucket-0".to_string())
+            .with_total_buckets(1)
+            .with_data_files(vec![test_data_file("data.parquet", 1, 0)])
+            
.with_data_deletion_files(vec![Some(crate::table::source::DeletionFile::new(
+                
"/tmp/test-partial-update-dv-read-builder/index/dv".to_string(),
+                0,
+                0,
+                None,
+            ))])
+            .build()
+            .unwrap();
+        let err = TableRead::new(&table, table.schema().fields().to_vec(), 
Vec::new())
+            .to_arrow(&[split])
+            .unwrap()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap_err();
+
+        assert!(
+            matches!(err, crate::Error::Unsupported { ref message } if 
message.contains("deletion vectors")),
+            "expected partial-update+DV read to fail fast with Unsupported, 
got {err:?}"
+        );
+    }
 }
diff --git a/crates/paimon/src/table/sort_merge.rs 
b/crates/paimon/src/table/sort_merge.rs
index b7dcbd2..8a9ece4 100644
--- a/crates/paimon/src/table/sort_merge.rs
+++ b/crates/paimon/src/table/sort_merge.rs
@@ -26,21 +26,47 @@
 //! - DataFusion: `SortPreservingMergeStream` (LoserTree layout)
 //! - Arrow-row: `RowConverter` for efficient key comparison
 
-use crate::spec::RowKind;
+use crate::spec::{PartialUpdateConfig, RowKind};
 use crate::table::ArrowRecordBatchStream;
 use crate::Error;
-use arrow_array::{ArrayRef, Int64Array, Int8Array, RecordBatch};
+use arrow_array::{new_null_array, ArrayRef, Int64Array, Int8Array, 
RecordBatch};
 use arrow_row::{RowConverter, Rows, SortField};
 use arrow_schema::SchemaRef;
 use arrow_select::interleave::interleave;
 use async_stream::try_stream;
 use futures::StreamExt;
 use std::cmp::Ordering;
+use std::collections::HashMap;
 
 // ---------------------------------------------------------------------------
 // MergeFunction
 // ---------------------------------------------------------------------------
 
+/// Buffered batches used by the merge reader.
+///
+/// Source batches keep the internal read schema, while materialized batches
+/// already match the merge output schema.
+#[derive(Clone)]
+pub(crate) enum BufferedBatch {
+    Source(RecordBatch),
+    Materialized(RecordBatch),
+}
+
+impl BufferedBatch {
+    fn column_for_output<'a>(
+        &'a self,
+        output_col_idx: usize,
+        source_output_col_indices: &[usize],
+    ) -> &'a dyn arrow_array::Array {
+        match self {
+            Self::Source(batch) => batch
+                .column(source_output_col_indices[output_col_idx])
+                .as_ref(),
+            Self::Materialized(batch) => batch.column(output_col_idx).as_ref(),
+        }
+    }
+}
+
 /// A row reference as an index into the batch buffer.
 pub(crate) struct MergeRow {
     /// Index into the shared batch buffer.
@@ -52,15 +78,55 @@ pub(crate) struct MergeRow {
     pub user_sequences: Vec<Option<i128>>,
 }
 
+#[cfg(test)]
+impl MergeRow {
+    fn source_batch<'a>(
+        &self,
+        batch_buffer: &'a [BufferedBatch],
+    ) -> crate::Result<&'a RecordBatch> {
+        match batch_buffer.get(self.batch_idx) {
+            Some(BufferedBatch::Source(batch)) => Ok(batch),
+            Some(BufferedBatch::Materialized(_)) => Err(Error::UnexpectedError 
{
+                message: format!(
+                    "Merge row unexpectedly referenced a materialized batch at 
index {}",
+                    self.batch_idx
+                ),
+                source: None,
+            }),
+            None => Err(Error::UnexpectedError {
+                message: format!(
+                    "Merge row referenced batch index {} outside the current 
buffer",
+                    self.batch_idx
+                ),
+                source: None,
+            }),
+        }
+    }
+}
+
+/// Merge result for rows sharing the same primary key.
+pub(crate) enum MergeResult {
+    /// Reuse an existing source row from the batch buffer.
+    SourceRow { batch_idx: usize, row_idx: usize },
+    /// Emit a synthesized one-row batch matching the merge output schema.
+    MaterializedRow(RecordBatch),
+    /// Omit this key from the output.
+    Omit,
+}
+
 /// Merge function applied to rows sharing the same primary key.
 ///
-/// For deduplicate: returns the single winner (batch_idx, row_idx), or None
-/// if the winning row should be filtered out (e.g. DELETE).
+/// Deduplicate-style engines can keep returning a source row. Future
+/// field-wise engines may instead materialize a new output row.
 pub(crate) trait MergeFunction: Send + Sync {
-    /// Pick the winning row from same-key candidates.
-    /// Returns `Some((batch_idx, row_idx))` of the winner, or `None` if the
-    /// key should be omitted from output (e.g. winner is a DELETE row).
-    fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result<Option<(usize, 
usize)>>;
+    /// Merge all rows sharing the same key into a final output result.
+    fn merge(
+        &self,
+        rows: &[MergeRow],
+        batch_buffer: &[BufferedBatch],
+        source_output_col_indices: &[usize],
+        output_schema: &SchemaRef,
+    ) -> crate::Result<MergeResult>;
 }
 
 /// Deduplicate merge: keeps the row with the highest sequence.
@@ -71,19 +137,28 @@ pub(crate) trait MergeFunction: Send + Sync {
 /// Filters out DELETE and UPDATE_BEFORE rows.
 pub(crate) struct DeduplicateMergeFunction;
 
+fn compare_sequence_order(lhs: &MergeRow, rhs: &MergeRow) -> Ordering {
+    match (lhs.user_sequences.is_empty(), rhs.user_sequences.is_empty()) {
+        (false, false) => lhs
+            .user_sequences
+            .cmp(&rhs.user_sequences)
+            .then_with(|| lhs.sequence_number.cmp(&rhs.sequence_number)),
+        _ => lhs.sequence_number.cmp(&rhs.sequence_number),
+    }
+}
+
 impl MergeFunction for DeduplicateMergeFunction {
-    fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result<Option<(usize, 
usize)>> {
+    fn merge(
+        &self,
+        rows: &[MergeRow],
+        _batch_buffer: &[BufferedBatch],
+        _source_output_col_indices: &[usize],
+        _output_schema: &SchemaRef,
+    ) -> crate::Result<MergeResult> {
         let winner = rows
             .iter()
             .reduce(|best, r| {
-                // Compare user sequences lexicographically first (if 
present), then system sequence.
-                let ord = match (r.user_sequences.is_empty(), 
best.user_sequences.is_empty()) {
-                    (false, false) => r
-                        .user_sequences
-                        .cmp(&best.user_sequences)
-                        .then_with(|| 
r.sequence_number.cmp(&best.sequence_number)),
-                    _ => r.sequence_number.cmp(&best.sequence_number),
-                };
+                let ord = compare_sequence_order(r, best);
                 // >= semantics: last-writer-wins for equal values.
                 if ord.is_ge() {
                     r
@@ -93,13 +168,110 @@ impl MergeFunction for DeduplicateMergeFunction {
             })
             .expect("merge called with empty rows");
         if RowKind::from_value(winner.value_kind)?.is_add() {
-            Ok(Some((winner.batch_idx, winner.row_idx)))
+            Ok(MergeResult::SourceRow {
+                batch_idx: winner.batch_idx,
+                row_idx: winner.row_idx,
+            })
         } else {
-            Ok(None)
+            Ok(MergeResult::Omit)
         }
     }
 }
 
+/// Basic partial-update merge: for each non-key column, keep the latest
+/// non-null value ordered by user sequence (if configured) then system 
sequence.
+///
+/// DELETE / UPDATE_BEFORE rows are treated as unsupported in this mode.
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct PartialUpdateMergeFunction(());
+
+impl PartialUpdateMergeFunction {
+    pub(crate) fn new(
+        table_options: &HashMap<String, String>,
+        table_name: &str,
+    ) -> crate::Result<Self> {
+        PartialUpdateConfig::new(table_options).validate_runtime_mode(true, 
table_name)?;
+        Ok(Self(()))
+    }
+}
+
+impl MergeFunction for PartialUpdateMergeFunction {
+    fn merge(
+        &self,
+        rows: &[MergeRow],
+        batch_buffer: &[BufferedBatch],
+        source_output_col_indices: &[usize],
+        output_schema: &SchemaRef,
+    ) -> crate::Result<MergeResult> {
+        if rows.is_empty() {
+            return Err(Error::UnexpectedError {
+                message: "merge called with empty rows".to_string(),
+                source: None,
+            });
+        }
+
+        let mut ordered_row_indices: Vec<usize> = (0..rows.len()).collect();
+        ordered_row_indices.sort_by(|&lhs_idx, &rhs_idx| {
+            compare_sequence_order(&rows[lhs_idx], &rows[rhs_idx])
+                .then_with(|| lhs_idx.cmp(&rhs_idx))
+        });
+
+        let mut latest_non_null_by_col: Vec<Option<(usize, usize)>> =
+            vec![None; output_schema.fields().len()];
+
+        for row_idx in ordered_row_indices {
+            let row = &rows[row_idx];
+            if !RowKind::from_value(row.value_kind)?.is_add() {
+                return Err(crate::Error::Unsupported {
+                    message: "merge-engine=partial-update basic mode does not 
support DELETE or UPDATE_BEFORE rows".to_string(),
+                });
+            }
+
+            for (output_col_idx, latest_non_null) in 
latest_non_null_by_col.iter_mut().enumerate() {
+                let source_array = batch_buffer[row.batch_idx]
+                    .column_for_output(output_col_idx, 
source_output_col_indices);
+                if !source_array.is_null(row.row_idx) {
+                    *latest_non_null = Some((row.batch_idx, row.row_idx));
+                }
+            }
+        }
+
+        let output_columns: Vec<ArrayRef> = output_schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(output_col_idx, field)| {
+                Ok(match latest_non_null_by_col[output_col_idx] {
+                    Some((batch_idx, row_idx)) => batch_buffer[batch_idx]
+                        .column_for_output(output_col_idx, 
source_output_col_indices)
+                        .slice(row_idx, 1),
+                    None => {
+                        if !field.is_nullable() {
+                            return Err(Error::DataInvalid {
+                                message: format!(
+                                    "merge-engine=partial-update produced NULL 
for non-nullable field '{}'",
+                                    field.name()
+                                ),
+                                source: None,
+                            });
+                        }
+                        new_null_array(field.data_type(), 1)
+                    }
+                })
+            })
+            .collect::<crate::Result<Vec<_>>>()?;
+
+        let batch = RecordBatch::try_new(output_schema.clone(), 
output_columns).map_err(|e| {
+            Error::UnexpectedError {
+                message: format!("Failed to build partial-update materialized 
row: {e}"),
+                source: Some(Box::new(e)),
+            }
+        })?;
+
+        Ok(MergeResult::MaterializedRow(batch))
+    }
+}
+
 // ---------------------------------------------------------------------------
 // SortMergeCursor
 // ---------------------------------------------------------------------------
@@ -318,7 +490,7 @@ impl SortMergeReaderBuilder {
         }
     }
 
-    #[allow(dead_code)]
+    #[cfg(test)]
     pub(crate) fn with_batch_size(mut self, batch_size: usize) -> Self {
         self.batch_size = batch_size;
         self
@@ -405,8 +577,9 @@ fn sort_merge_stream(
         return Ok(futures::stream::empty().boxed());
     }
 
-    // Output column indices: key columns + value columns (skip 
_SEQUENCE_NUMBER).
-    let output_col_indices: Vec<usize> = key_indices
+    // Output column indices for source batches: key columns + value columns
+    // (skip system columns like _SEQUENCE_NUMBER).
+    let source_output_col_indices: Vec<usize> = key_indices
         .iter()
         .chain(value_indices.iter())
         .copied()
@@ -440,7 +613,7 @@ fn sort_merge_stream(
         // Each cursor's current batch gets an entry; when a cursor advances
         // to a new batch, the old one stays in the buffer until the output
         // batch is flushed.
-        let mut batch_buffer: Vec<RecordBatch> = Vec::new();
+        let mut batch_buffer: Vec<BufferedBatch> = Vec::new();
         // Map from stream_idx -> current batch_buffer index.
         let mut stream_batch_idx: Vec<Option<usize>> = vec![None; num_streams];
 
@@ -448,7 +621,7 @@ fn sort_merge_stream(
         for (i, cursor) in cursors.iter().enumerate() {
             if let Some(c) = cursor {
                 let idx = batch_buffer.len();
-                batch_buffer.push(c.batch.clone());
+                batch_buffer.push(BufferedBatch::Source(c.batch.clone()));
                 stream_batch_idx[i] = Some(idx);
             }
         }
@@ -508,7 +681,7 @@ fn sort_merge_stream(
                             if batch.num_rows() > 0 {
                                 let rows = convert_batch_keys(&batch, 
&key_indices, &mut row_converter)?;
                                 let buf_idx = batch_buffer.len();
-                                batch_buffer.push(batch.clone());
+                                
batch_buffer.push(BufferedBatch::Source(batch.clone()));
                                 stream_batch_idx[current_winner] = 
Some(buf_idx);
                                 cursors[current_winner] = Some(SortMergeCursor 
{ batch, rows, offset: 0 });
                                 break;
@@ -521,10 +694,36 @@ fn sort_merge_stream(
                 tree.update(|a, b| compare_cursors(&cursors, a, 
b).then_with(|| a.cmp(&b)).is_gt());
             }
 
-            // Apply merge function to pick the winner row.
-            // Returns None if the winning row is a DELETE/UPDATE_BEFORE — 
skip it.
-            if let Some((win_batch_idx, win_row_idx)) = 
merge_function.pick_winner(&same_key_rows)? {
-                output_indices.push((win_batch_idx, win_row_idx));
+            match merge_function.merge(
+                &same_key_rows,
+                &batch_buffer,
+                &source_output_col_indices,
+                &output_schema,
+            )? {
+                MergeResult::SourceRow { batch_idx, row_idx } => {
+                    output_indices.push((batch_idx, row_idx));
+                }
+                MergeResult::MaterializedRow(batch) => {
+                    if batch.num_rows() != 1 {
+                        Err(Error::UnexpectedError {
+                            message: format!(
+                                "Materialized merge result must contain 
exactly one row, got {}",
+                                batch.num_rows()
+                            ),
+                            source: None,
+                        })?;
+                    }
+                    if batch.schema().as_ref() != output_schema.as_ref() {
+                        Err(Error::UnexpectedError {
+                            message: "Materialized merge result schema does 
not match merge output schema".to_string(),
+                            source: None,
+                        })?;
+                    }
+                    let batch_idx = batch_buffer.len();
+                    batch_buffer.push(BufferedBatch::Materialized(batch));
+                    output_indices.push((batch_idx, 0));
+                }
+                MergeResult::Omit => {}
             }
 
             // Yield a batch when we've accumulated enough rows.
@@ -532,13 +731,14 @@ fn sort_merge_stream(
                 let batch = build_output_interleave(
                     &output_schema,
                     &batch_buffer,
-                    &output_col_indices,
+                    &source_output_col_indices,
                     &output_indices,
                 )?;
                 output_indices.clear();
-                // Compact batch buffer: only keep batches still referenced by 
cursors.
-                // SAFETY: output_indices was just cleared above, so no stale 
references
-                // exist into the buffer. The yield below happens after 
compaction.
+                // Compact batch buffer after the pending output rows have been
+                // materialized. Source batches still referenced by cursors 
stay
+                // alive; materialized batches can be dropped here because they
+                // are referenced only by the flushed output_indices above.
                 compact_batch_buffer(
                     &mut batch_buffer,
                     &mut stream_batch_idx,
@@ -553,7 +753,7 @@ fn sort_merge_stream(
             let batch = build_output_interleave(
                 &output_schema,
                 &batch_buffer,
-                &output_col_indices,
+                &source_output_col_indices,
                 &output_indices,
             )?;
             yield batch;
@@ -566,20 +766,18 @@ fn sort_merge_stream(
 /// batch buffer in one pass per column.
 fn build_output_interleave(
     schema: &SchemaRef,
-    batch_buffer: &[RecordBatch],
-    output_col_indices: &[usize],
+    batch_buffer: &[BufferedBatch],
+    source_output_col_indices: &[usize],
     indices: &[(usize, usize)],
 ) -> crate::Result<RecordBatch> {
-    let columns: Vec<ArrayRef> = output_col_indices
-        .iter()
-        .map(|&col_idx| {
-            // Collect all arrays for this column from the batch buffer.
+    let columns: Vec<ArrayRef> = (0..schema.fields().len())
+        .map(|output_col_idx| {
             let arrays: Vec<&dyn arrow_array::Array> = batch_buffer
                 .iter()
-                .map(|b| b.column(col_idx).as_ref())
+                .map(|batch| batch.column_for_output(output_col_idx, 
source_output_col_indices))
                 .collect();
             interleave(&arrays, indices).map_err(|e| Error::UnexpectedError {
-                message: format!("Failed to interleave column {col_idx}: {e}"),
+                message: format!("Failed to interleave output column 
{output_col_idx}: {e}"),
                 source: Some(Box::new(e)),
             })
         })
@@ -594,7 +792,7 @@ fn build_output_interleave(
 /// Compact the batch buffer by removing batches no longer referenced by any
 /// cursor, and updating indices accordingly.
 fn compact_batch_buffer(
-    batch_buffer: &mut Vec<RecordBatch>,
+    batch_buffer: &mut Vec<BufferedBatch>,
     stream_batch_idx: &mut [Option<usize>],
     cursors: &[Option<SortMergeCursor>],
 ) {
@@ -610,7 +808,7 @@ fn compact_batch_buffer(
 
     // Build old->new index mapping.
     let mut new_indices: Vec<Option<usize>> = vec![None; batch_buffer.len()];
-    let mut new_buffer: Vec<RecordBatch> = Vec::new();
+    let mut new_buffer: Vec<BufferedBatch> = Vec::new();
     for (old_idx, is_alive) in alive.iter().enumerate() {
         if *is_alive {
             new_indices[old_idx] = Some(new_buffer.len());
@@ -642,6 +840,7 @@ mod tests {
     use arrow_array::{Array, Int32Array, Int64Array, Int8Array, StringArray};
     use arrow_schema::{DataType, Field, Schema};
     use futures::TryStreamExt;
+    use std::collections::HashMap;
     use std::sync::Arc;
 
     fn make_schema() -> SchemaRef {
@@ -693,6 +892,41 @@ mod tests {
         futures::stream::iter(batches.into_iter().map(Ok)).boxed()
     }
 
+    struct MaterializingMergeFunction;
+
+    impl MergeFunction for MaterializingMergeFunction {
+        fn merge(
+            &self,
+            rows: &[MergeRow],
+            batch_buffer: &[BufferedBatch],
+            source_output_col_indices: &[usize],
+            output_schema: &SchemaRef,
+        ) -> crate::Result<MergeResult> {
+            let first = rows.first().expect("merge called with empty rows");
+            let source_batch = first.source_batch(batch_buffer)?;
+            let pk = source_batch
+                .column(source_output_col_indices[0])
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .expect("pk column must be Int32")
+                .value(first.row_idx);
+
+            let batch = RecordBatch::try_new(
+                output_schema.clone(),
+                vec![
+                    Arc::new(Int32Array::from(vec![pk])) as ArrayRef,
+                    Arc::new(StringArray::from(vec![Some("merged")])) as 
ArrayRef,
+                ],
+            )
+            .map_err(|e| Error::UnexpectedError {
+                message: format!("Failed to build materialized merge batch: 
{e}"),
+                source: Some(Box::new(e)),
+            })?;
+
+            Ok(MergeResult::MaterializedRow(batch))
+        }
+    }
+
     #[tokio::test]
     async fn test_loser_tree_basic() {
         // 3 streams, verify init produces correct winner
@@ -1264,4 +1498,225 @@ mod tests {
         assert_eq!(pks, vec![1, 2, 3, 4]);
         assert_eq!(values, vec!["a", "b", "c", "d"]);
     }
+
+    #[tokio::test]
+    async fn test_materialized_merge_result_path() {
+        let schema = make_schema();
+        let s0 = stream_from_batches(vec![make_batch(
+            &schema,
+            vec![1, 2],
+            vec![1, 1],
+            vec![Some("old_a"), Some("old_b")],
+        )]);
+        let s1 = stream_from_batches(vec![make_batch(
+            &schema,
+            vec![1, 3],
+            vec![2, 1],
+            vec![Some("new_a"), Some("c")],
+        )]);
+
+        let result = SortMergeReaderBuilder::new(
+            vec![s0, s1],
+            schema,
+            vec![0],
+            1,
+            2,
+            vec![],
+            vec![3],
+            make_output_schema(),
+            Box::new(MaterializingMergeFunction),
+        )
+        .build()
+        .unwrap()
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
+
+        let pks: Vec<i32> = result
+            .iter()
+            .flat_map(|b| {
+                b.column(0)
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .unwrap()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect();
+        let values: Vec<String> = result
+            .iter()
+            .flat_map(|b| {
+                let arr = 
b.column(1).as_any().downcast_ref::<StringArray>().unwrap();
+                (0..arr.len())
+                    .map(|i| arr.value(i).to_string())
+                    .collect::<Vec<_>>()
+            })
+            .collect();
+
+        assert_eq!(pks, vec![1, 2, 3]);
+        assert_eq!(values, vec!["merged", "merged", "merged"]);
+    }
+
+    #[tokio::test]
+    async fn test_partial_update_merge_keeps_latest_non_null_values() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("pk", DataType::Int32, false),
+            Field::new("_SEQUENCE_NUMBER", DataType::Int64, false),
+            Field::new("_VALUE_KIND", DataType::Int8, false),
+            Field::new("v_int", DataType::Int32, true),
+            Field::new("v_str", DataType::Utf8, true),
+        ]));
+        let output_schema = Arc::new(Schema::new(vec![
+            Field::new("pk", DataType::Int32, false),
+            Field::new("v_int", DataType::Int32, true),
+            Field::new("v_str", DataType::Utf8, true),
+        ]));
+
+        let s0 = stream_from_batches(vec![RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2])),
+                Arc::new(Int64Array::from(vec![1, 1])),
+                Arc::new(Int8Array::from(vec![0, 0])),
+                Arc::new(Int32Array::from(vec![10, 20])),
+                Arc::new(StringArray::from(vec![Some("old-1"), 
Some("old-2")])),
+            ],
+        )
+        .unwrap()]);
+        let s1 = stream_from_batches(vec![RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(Int64Array::from(vec![2, 2, 1])),
+                Arc::new(Int8Array::from(vec![0, 0, 0])),
+                Arc::new(Int32Array::from(vec![None, Some(200), Some(30)])),
+                Arc::new(StringArray::from(vec![Some("new-1"), None, None])),
+            ],
+        )
+        .unwrap()]);
+
+        let result = SortMergeReaderBuilder::new(
+            vec![s0, s1],
+            schema,
+            vec![0],
+            1,
+            2,
+            vec![],
+            vec![3, 4],
+            output_schema,
+            Box::new(PartialUpdateMergeFunction::new(&HashMap::new(), 
"test_table").unwrap()),
+        )
+        .build()
+        .unwrap()
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
+
+        let mut rows: Vec<(i32, Option<i32>, Option<String>)> = Vec::new();
+        for batch in &result {
+            let ids = batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap();
+            let ints = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap();
+            let strs = batch
+                .column(2)
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap();
+            for i in 0..batch.num_rows() {
+                rows.push((
+                    ids.value(i),
+                    if ints.is_null(i) {
+                        None
+                    } else {
+                        Some(ints.value(i))
+                    },
+                    if strs.is_null(i) {
+                        None
+                    } else {
+                        Some(strs.value(i).to_string())
+                    },
+                ));
+            }
+        }
+        rows.sort_by_key(|row| row.0);
+
+        assert_eq!(
+            rows,
+            vec![
+                (1, Some(10), Some("new-1".to_string())),
+                (2, Some(200), Some("old-2".to_string())),
+                (3, Some(30), None),
+            ]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_partial_update_merge_rejects_delete_like_rows() {
+        let schema = make_schema();
+        let output_schema = make_output_schema();
+        let s0 = stream_from_batches(vec![make_batch_with_kind(
+            &schema,
+            vec![1],
+            vec![1],
+            vec![0],
+            vec![Some("old")],
+        )]);
+        let s1 = stream_from_batches(vec![make_batch_with_kind(
+            &schema,
+            vec![1],
+            vec![2],
+            vec![3],
+            vec![Some("delete")],
+        )]);
+
+        let err = SortMergeReaderBuilder::new(
+            vec![s0, s1],
+            schema,
+            vec![0],
+            1,
+            2,
+            vec![],
+            vec![3],
+            output_schema,
+            Box::new(PartialUpdateMergeFunction::new(&HashMap::new(), 
"test_table").unwrap()),
+        )
+        .build()
+        .unwrap()
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap_err();
+
+        assert!(matches!(
+            err,
+            Error::Unsupported { message }
+            if message.contains("partial-update basic mode does not support 
DELETE or UPDATE_BEFORE")
+        ));
+    }
+
+    #[test]
+    fn test_partial_update_merge_function_new_rejects_unsupported_options() {
+        let options = HashMap::from([
+            ("merge-engine".to_string(), "partial-update".to_string()),
+            (
+                "fields.price.aggregate-function".to_string(),
+                "last_non_null".to_string(),
+            ),
+        ]);
+
+        let err = PartialUpdateMergeFunction::new(&options, 
"default.t").unwrap_err();
+
+        assert!(matches!(
+            err,
+            Error::Unsupported { message }
+            if message.contains("fields.price.aggregate-function")
+        ));
+    }
 }
diff --git a/crates/paimon/src/table/table_read.rs 
b/crates/paimon/src/table/table_read.rs
index 078f942..5493938 100644
--- a/crates/paimon/src/table/table_read.rs
+++ b/crates/paimon/src/table/table_read.rs
@@ -21,7 +21,7 @@ use super::kv_file_reader::{KeyValueFileReader, 
KeyValueReadConfig};
 use super::read_builder::split_scan_predicates;
 use super::{ArrowRecordBatchStream, Table};
 use crate::arrow::filtering::reader_pruning_predicates;
-use crate::spec::{CoreOptions, DataField, Predicate};
+use crate::spec::{CoreOptions, DataField, MergeEngine, Predicate};
 use crate::DataSplit;
 
 /// Table read: reads data from splits (e.g. produced by [TableScan::plan]).
@@ -74,17 +74,18 @@ impl<'a> TableRead<'a> {
     pub fn to_arrow(&self, data_splits: &[DataSplit]) -> 
crate::Result<ArrowRecordBatchStream> {
         let has_primary_keys = !self.table.schema.primary_keys().is_empty();
         let core_options = CoreOptions::new(self.table.schema.options());
+        let merge_engine = core_options.merge_engine()?;
 
         // PK table with Deduplicate engine: splits containing level-0 files
         // need KeyValueFileReader for sort-merge dedup; splits with only
         // compacted files (level > 0) can use the faster DataFileReader.
-        // FirstRow engine falls through — scan already skips level-0.
         if has_primary_keys
-            && core_options
-                .merge_engine()
-                .is_ok_and(|e| e == crate::spec::MergeEngine::Deduplicate)
+            && matches!(
+                merge_engine,
+                MergeEngine::Deduplicate | MergeEngine::PartialUpdate
+            )
         {
-            return self.read_pk_deduplicate(data_splits, &core_options);
+            return self.read_pk(data_splits, &core_options);
         }
 
         if core_options.data_evolution_enabled() {
@@ -96,11 +97,15 @@ impl<'a> TableRead<'a> {
 
     /// Read PK table with Deduplicate engine: level-0 splits go through
     /// KeyValueFileReader for sort-merge dedup, compacted splits use 
DataFileReader.
-    fn read_pk_deduplicate(
+    fn read_pk(
         &self,
         data_splits: &[DataSplit],
         core_options: &CoreOptions,
     ) -> crate::Result<ArrowRecordBatchStream> {
+        if core_options.merge_engine()? == MergeEngine::PartialUpdate {
+            return self.read_kv(data_splits, core_options);
+        }
+
         let mut kv_splits = Vec::new();
         let mut raw_splits = Vec::new();
         for split in data_splits {
@@ -134,12 +139,15 @@ impl<'a> TableRead<'a> {
         let reader = KeyValueFileReader::new(
             self.table.file_io.clone(),
             KeyValueReadConfig {
+                table_name: self.table.identifier().full_name(),
+                table_options: self.table.schema().options().clone(),
                 schema_manager: self.table.schema_manager().clone(),
                 table_schema_id: self.table.schema().id(),
                 table_fields: self.table.schema.fields().to_vec(),
                 read_type: self.read_type().to_vec(),
                 predicates: self.data_predicates.clone(),
                 primary_keys: self.table.schema.trimmed_primary_keys(),
+                merge_engine: core_options.merge_engine()?,
                 sequence_fields: core_options
                     .sequence_fields()
                     .iter()
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index a5ff103..be0d2df 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -284,6 +284,22 @@ pub(super) fn can_push_down_limit_hint_for_scan(
     data_predicates.is_empty() && row_ranges.is_none()
 }
 
+fn should_skip_level_zero_for_scan(
+    scan_all_files: bool,
+    has_primary_keys: bool,
+    deletion_vectors_enabled: bool,
+    merge_engine: crate::Result<crate::spec::MergeEngine>,
+) -> bool {
+    if scan_all_files {
+        return false;
+    }
+    if !has_primary_keys {
+        return false;
+    }
+
+    deletion_vectors_enabled || merge_engine.is_ok_and(|e| e == 
crate::spec::MergeEngine::FirstRow)
+}
+
 /// TableScan for full table scan (no incremental, no predicate).
 ///
 /// Reference: 
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
@@ -474,16 +490,12 @@ impl<'a> TableScan<'a> {
         //
         // Non-read paths (overwrite, truncate, writer restore) set 
scan_all_files=true
         // to see all files including level-0, matching Java's CommitScanner 
behavior.
-        let skip_level_zero = if self.scan_all_files {
-            false
-        } else if has_primary_keys {
-            deletion_vectors_enabled
-                || core_options
-                    .merge_engine()
-                    .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow)
-        } else {
-            false
-        };
+        let skip_level_zero = should_skip_level_zero_for_scan(
+            self.scan_all_files,
+            has_primary_keys,
+            deletion_vectors_enabled,
+            core_options.merge_engine(),
+        );
 
         let partition_fields = self.table.schema().partition_fields();
 
@@ -768,7 +780,7 @@ impl<'a> TableScan<'a> {
 
 #[cfg(test)]
 mod tests {
-    use super::TableScan;
+    use super::{should_skip_level_zero_for_scan, TableScan};
     use crate::catalog::Identifier;
     use crate::io::FileIOBuilder;
     use crate::spec::{
@@ -986,6 +998,26 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_first_row_skips_level_zero_by_default() {
+        assert!(should_skip_level_zero_for_scan(
+            false,
+            true,
+            false,
+            Ok(crate::spec::MergeEngine::FirstRow),
+        ));
+    }
+
+    #[test]
+    fn test_scan_all_files_disables_first_row_level_zero_skip() {
+        assert!(!should_skip_level_zero_for_scan(
+            true,
+            true,
+            false,
+            Ok(crate::spec::MergeEngine::FirstRow),
+        ));
+    }
+
     #[test]
     fn test_partition_filter_decode_failure_fails_open() {
         let fields = partition_string_field();
diff --git a/crates/paimon/src/table/table_write.rs 
b/crates/paimon/src/table/table_write.rs
index f817b75..b89a1c9 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -662,6 +662,8 @@ impl TableWrite {
         Ok(FileWriter::KeyValue(KeyValueFileWriter::new(
             self.table.file_io().clone(),
             KeyValueWriteConfig {
+                table_name: self.table.identifier().full_name(),
+                table_options: self.table.schema().options().clone(),
                 table_location: self.table.location().to_string(),
                 partition_path,
                 bucket,
@@ -674,9 +676,15 @@ impl TableWrite {
                 primary_key_types: self.primary_key_types.clone(),
                 sequence_field_indices: self.sequence_field_indices.clone(),
                 merge_engine: self.merge_engine,
+                dynamic_bucket_enabled: matches!(
+                    self.bucket_assigner,
+                    BucketAssignerEnum::Dynamic(_) | 
BucketAssignerEnum::CrossPartition(_)
+                ),
+                deletion_vectors_enabled: 
CoreOptions::new(self.table.schema().options())
+                    .deletion_vectors_enabled(),
             },
             next_seq,
-        )))
+        )?))
     }
 }
 
@@ -898,6 +906,97 @@ mod tests {
         assert_eq!(snapshot.id(), 1);
     }
 
+    #[test]
+    fn test_allows_partial_update_fixed_bucket_table() {
+        let table = Table::new(
+            test_file_io(),
+            Identifier::new("default", "test_partial_update_table"),
+            "memory:/test_partial_update_table".to_string(),
+            TableSchema::new(
+                0,
+                &Schema::builder()
+                    .column("id", DataType::Int(IntType::new()))
+                    .column("value", DataType::Int(IntType::new()))
+                    .primary_key(["id"])
+                    .option("bucket", "1")
+                    .option("merge-engine", "partial-update")
+                    .build()
+                    .unwrap(),
+            ),
+            None,
+        );
+
+        TableWrite::new(&table, "test-user".to_string(), false).unwrap();
+    }
+
+    #[tokio::test]
+    async fn 
test_rejects_partial_update_dynamic_bucket_table_when_creating_writer() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_partial_update_dynamic_bucket_table";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", 
"test_partial_update_dynamic_bucket_table"),
+            table_path.to_string(),
+            TableSchema::new(
+                0,
+                &Schema::builder()
+                    .column("id", DataType::Int(IntType::new()))
+                    .column("value", DataType::Int(IntType::new()))
+                    .primary_key(["id"])
+                    .option("merge-engine", "partial-update")
+                    .build()
+                    .unwrap(),
+            ),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, "test-user".to_string(), 
false).unwrap();
+        let err = table_write
+            .write_arrow_batch(&make_batch(vec![1], vec![10]))
+            .await
+            .unwrap_err();
+        assert!(
+            matches!(err, crate::Error::Unsupported { message } if 
message.contains("bucket=-1"))
+        );
+    }
+
+    #[tokio::test]
+    async fn 
test_rejects_partial_update_with_deletion_vectors_when_creating_writer() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_partial_update_dv_table";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = Table::new(
+            file_io,
+            Identifier::new("default", "test_partial_update_dv_table"),
+            table_path.to_string(),
+            TableSchema::new(
+                0,
+                &Schema::builder()
+                    .column("id", DataType::Int(IntType::new()))
+                    .column("value", DataType::Int(IntType::new()))
+                    .primary_key(["id"])
+                    .option("bucket", "1")
+                    .option("merge-engine", "partial-update")
+                    .option("deletion-vectors.enabled", "true")
+                    .build()
+                    .unwrap(),
+            ),
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table, "test-user".to_string(), 
false).unwrap();
+        let err = table_write
+            .write_arrow_batch(&make_batch(vec![1], vec![10]))
+            .await
+            .unwrap_err();
+        assert!(
+            matches!(err, crate::Error::Unsupported { message } if 
message.contains("deletion-vectors.enabled=true"))
+        );
+    }
+
     #[tokio::test]
     async fn test_write_partitioned() {
         let file_io = test_file_io();

Reply via email to