This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 748b211 feat: support fixed-bucket partial-update merge engine (#263)
748b211 is described below
commit 748b21139608dd8bc6e6fb480e7c6c82361c4863
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 12 14:59:03 2026 +0800
feat: support fixed-bucket partial-update merge engine (#263)
---
crates/integrations/datafusion/tests/pk_tables.rs | 102 +++-
crates/paimon/src/spec/core_options.rs | 11 +
crates/paimon/src/spec/mod.rs | 3 +
crates/paimon/src/spec/partial_update.rs | 202 ++++++++
crates/paimon/src/spec/schema.rs | 25 +
crates/paimon/src/table/bucket_assigner_cross.rs | 21 +-
crates/paimon/src/table/cow_writer.rs | 1 +
crates/paimon/src/table/kv_file_reader.rs | 35 +-
crates/paimon/src/table/kv_file_writer.rs | 275 +++++++++--
crates/paimon/src/table/read_builder.rs | 53 +++
crates/paimon/src/table/sort_merge.rs | 543 ++++++++++++++++++++--
crates/paimon/src/table/table_read.rs | 22 +-
crates/paimon/src/table/table_scan.rs | 54 ++-
crates/paimon/src/table/table_write.rs | 101 +++-
14 files changed, 1334 insertions(+), 114 deletions(-)
diff --git a/crates/integrations/datafusion/tests/pk_tables.rs
b/crates/integrations/datafusion/tests/pk_tables.rs
index fe09f98..3cc0c8b 100644
--- a/crates/integrations/datafusion/tests/pk_tables.rs
+++ b/crates/integrations/datafusion/tests/pk_tables.rs
@@ -31,7 +31,7 @@ use common::{
collect_id_name, collect_id_value, create_sql_context, create_test_env,
row_count,
setup_sql_context,
};
-use datafusion::arrow::array::{Int32Array, StringArray};
+use datafusion::arrow::array::{Array, Int32Array, StringArray};
use paimon::catalog::Identifier;
use paimon::Catalog;
@@ -76,6 +76,106 @@ async fn test_pk_basic_write_read() {
);
}
+/// Partial-update merge engine: keep latest non-null value for each field.
+#[tokio::test]
+async fn test_pk_partial_update_fixed_bucket_e2e() {
+ let (_tmp, handler) = setup_handler().await;
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.test_db.t_partial_update (
+ id INT NOT NULL, v_int INT, v_str STRING,
+ PRIMARY KEY (id)
+ ) WITH ('bucket' = '1', 'merge-engine' = 'partial-update')",
+ )
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "INSERT INTO paimon.test_db.t_partial_update VALUES
+ (1, 10, 'old-1'),
+ (2, 20, 'old-2')",
+ )
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "INSERT INTO paimon.test_db.t_partial_update VALUES
+ (1, CAST(NULL AS INT), 'new-1'),
+ (2, 200, CAST(NULL AS STRING)),
+ (3, 30, CAST(NULL AS STRING))",
+ )
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "INSERT INTO paimon.test_db.t_partial_update VALUES
+ (1, 111, CAST(NULL AS STRING))",
+ )
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let batches = handler
+ .sql("SELECT id, v_int, v_str FROM paimon.test_db.t_partial_update
ORDER BY id")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let mut rows = Vec::new();
+ for batch in &batches {
+ let ids = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .unwrap();
+ let ints = batch
+ .column_by_name("v_int")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .unwrap();
+ let strs = batch
+ .column_by_name("v_str")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .unwrap();
+ for i in 0..batch.num_rows() {
+ rows.push((
+ ids.value(i),
+ if ints.is_null(i) {
+ None
+ } else {
+ Some(ints.value(i))
+ },
+ if strs.is_null(i) {
+ None
+ } else {
+ Some(strs.value(i).to_string())
+ },
+ ));
+ }
+ }
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, Some(111), Some("new-1".to_string())),
+ (2, Some(200), Some("old-2".to_string())),
+ (3, Some(30), None),
+ ]
+ );
+}
+
// ======================= Dedup Within Single Commit =======================
/// Duplicate keys in a single INSERT — last value wins (Deduplicate engine).
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index 7cdbbc4..bafad0a 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -69,6 +69,8 @@ const BLOB_DESCRIPTOR_FIELD_OPTION: &str =
"blob-descriptor-field";
pub enum MergeEngine {
/// Keep the row with the highest sequence number (default).
Deduplicate,
+ /// Merge same-key rows field-by-field, usually keeping non-null updates.
+ PartialUpdate,
/// Keep the first row for each key (ignore later updates).
FirstRow,
}
@@ -127,6 +129,7 @@ impl<'a> CoreOptions<'a> {
None => Ok(MergeEngine::Deduplicate),
Some(v) => match v.to_ascii_lowercase().as_str() {
"deduplicate" => Ok(MergeEngine::Deduplicate),
+ "partial-update" => Ok(MergeEngine::PartialUpdate),
"first-row" => Ok(MergeEngine::FirstRow),
other => Err(crate::Error::Unsupported {
message: format!("Unsupported merge-engine: '{other}'"),
@@ -535,6 +538,14 @@ mod tests {
}
}
+ #[test]
+ fn test_merge_engine_accepts_partial_update() {
+ let options = HashMap::from([(MERGE_ENGINE_OPTION.to_string(),
"partial-update".into())]);
+ let core = CoreOptions::new(&options);
+
+ assert_eq!(core.merge_engine().unwrap(), MergeEngine::PartialUpdate);
+ }
+
#[test]
fn test_commit_options_defaults() {
let options = HashMap::new();
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 6ad6533..270b27f 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -32,6 +32,9 @@ mod core_options;
pub(crate) use core_options::TimeTravelSelector;
pub use core_options::*;
+mod partial_update;
+pub(crate) use partial_update::PartialUpdateConfig;
+
mod schema;
pub use schema::*;
diff --git a/crates/paimon/src/spec/partial_update.rs
b/crates/paimon/src/spec/partial_update.rs
new file mode 100644
index 0000000..b7ae1b6
--- /dev/null
+++ b/crates/paimon/src/spec/partial_update.rs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+const MERGE_ENGINE_OPTION: &str = "merge-engine";
+const PARTIAL_UPDATE_ENGINE: &str = "partial-update";
+const IGNORE_DELETE_OPTION: &str = "ignore-delete";
+const IGNORE_DELETE_SUFFIX: &str = ".ignore-delete";
+const PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION: &str =
+ "partial-update.remove-record-on-delete";
+const PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION: &str =
+ "partial-update.remove-record-on-sequence-group";
+const FIELDS_DEFAULT_AGG_FUNCTION_OPTION: &str =
"fields.default-aggregate-function";
+const FIELDS_PREFIX: &str = "fields.";
+const SEQUENCE_GROUP_SUFFIX: &str = ".sequence-group";
+const AGGREGATION_FUNCTION_SUFFIX: &str = ".aggregate-function";
+
+/// Minimal partial-update mode recognized by the current Rust implementation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum PartialUpdateMode {
+ Basic,
+}
+
+/// Partial-update-specific option inspection and validation.
+///
+/// PR1 only recognizes the basic mode: `merge-engine=partial-update` on a PK
+/// table without delete, sequence-group, or aggregation controls.
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct PartialUpdateConfig<'a> {
+ options: &'a HashMap<String, String>,
+}
+
+impl<'a> PartialUpdateConfig<'a> {
+ pub(crate) fn new(options: &'a HashMap<String, String>) -> Self {
+ Self { options }
+ }
+
+ pub(crate) fn is_enabled(&self) -> bool {
+ self.options
+ .get(MERGE_ENGINE_OPTION)
+ .is_some_and(|value|
value.eq_ignore_ascii_case(PARTIAL_UPDATE_ENGINE))
+ }
+
+ pub(crate) fn validate_create_mode(
+ &self,
+ has_primary_keys: bool,
+ ) -> crate::Result<Option<PartialUpdateMode>> {
+ match self.validated_mode(has_primary_keys) {
+ Ok(mode) => Ok(mode),
+ Err(unsupported_options) => Err(crate::Error::ConfigInvalid {
+ message: format!(
+ "merge-engine=partial-update only supports the basic mode
in this build; unsupported options: {}",
+ unsupported_options.join(", ")
+ ),
+ }),
+ }
+ }
+
+ pub(crate) fn validate_runtime_mode(
+ &self,
+ has_primary_keys: bool,
+ table_name: &str,
+ ) -> crate::Result<Option<PartialUpdateMode>> {
+ match self.validated_mode(has_primary_keys) {
+ Ok(mode) => Ok(mode),
+ Err(unsupported_options) => Err(crate::Error::Unsupported {
+ message: format!(
+ "Table '{table_name}' uses merge-engine=partial-update
options not supported by this build: {}",
+ unsupported_options.join(", ")
+ ),
+ }),
+ }
+ }
+
+ fn validated_mode(
+ &self,
+ has_primary_keys: bool,
+ ) -> std::result::Result<Option<PartialUpdateMode>, Vec<String>> {
+ if !has_primary_keys || !self.is_enabled() {
+ return Ok(None);
+ }
+
+ let unsupported_options = self.unsupported_option_keys();
+ if !unsupported_options.is_empty() {
+ return Err(unsupported_options);
+ }
+
+ Ok(Some(PartialUpdateMode::Basic))
+ }
+
+ fn unsupported_option_keys(&self) -> Vec<String> {
+ let mut keys: Vec<String> = self
+ .options
+ .keys()
+ .filter(|key| is_unsupported_partial_update_option(key))
+ .cloned()
+ .collect();
+ keys.sort();
+ keys
+ }
+}
+
+fn is_unsupported_partial_update_option(key: &str) -> bool {
+ key == IGNORE_DELETE_OPTION
+ || key.ends_with(IGNORE_DELETE_SUFFIX)
+ || key == PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION
+ || key == PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION
+ || key == FIELDS_DEFAULT_AGG_FUNCTION_OPTION
+ || is_fields_option_with_suffix(key, SEQUENCE_GROUP_SUFFIX)
+ || is_fields_option_with_suffix(key, AGGREGATION_FUNCTION_SUFFIX)
+}
+
+fn is_fields_option_with_suffix(key: &str, suffix: &str) -> bool {
+ key.starts_with(FIELDS_PREFIX) && key.ends_with(suffix)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn partial_update_options(extra: &[(&str, &str)]) -> HashMap<String,
String> {
+ let mut options = HashMap::from([(
+ MERGE_ENGINE_OPTION.to_string(),
+ PARTIAL_UPDATE_ENGINE.to_string(),
+ )]);
+ options.extend(
+ extra
+ .iter()
+ .map(|(key, value)| ((*key).to_string(),
(*value).to_string())),
+ );
+ options
+ }
+
+ #[test]
+ fn test_validate_create_mode_accepts_basic_pk_partial_update() {
+ let options = partial_update_options(&[]);
+ let config = PartialUpdateConfig::new(&options);
+
+ assert_eq!(
+ config.validate_create_mode(true).unwrap(),
+ Some(PartialUpdateMode::Basic)
+ );
+ }
+
+ #[test]
+ fn test_validate_create_mode_ignores_non_pk_tables() {
+ let options = partial_update_options(&[(IGNORE_DELETE_OPTION,
"true")]);
+ let config = PartialUpdateConfig::new(&options);
+
+ assert_eq!(config.validate_create_mode(false).unwrap(), None);
+ }
+
+ #[test]
+ fn test_validate_create_mode_rejects_unsupported_partial_update_options() {
+ for key in [
+ IGNORE_DELETE_OPTION,
+ "partial-update.ignore-delete",
+ PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION,
+ PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION,
+ "fields.price.sequence-group",
+ "fields.price.aggregate-function",
+ FIELDS_DEFAULT_AGG_FUNCTION_OPTION,
+ ] {
+ let options = partial_update_options(&[(key, "value")]);
+ let config = PartialUpdateConfig::new(&options);
+ let err = config.validate_create_mode(true).unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::ConfigInvalid { ref message } if
message.contains(key)),
+ "expected create-time rejection to mention '{key}', got
{err:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn test_validate_runtime_mode_rejects_unsupported_partial_update_options()
{
+ let options =
+ partial_update_options(&[("fields.price.aggregate-function",
"last_non_null")]);
+ let config = PartialUpdateConfig::new(&options);
+ let err = config.validate_runtime_mode(true, "default.t").unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::Unsupported { ref message } if
message.contains("fields.price.aggregate-function")),
+ "expected runtime rejection to mention the unsupported option, got
{err:?}"
+ );
+ }
+}
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index e09027b..ddada6a 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -17,6 +17,7 @@
use crate::spec::core_options::CoreOptions;
use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
+use crate::spec::PartialUpdateConfig;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::{HashMap, HashSet};
@@ -289,6 +290,7 @@ impl Schema {
let partition_keys = Self::normalize_partition_keys(&partition_keys,
&mut options)?;
let fields = Self::normalize_fields(&fields, &partition_keys,
&primary_keys)?;
Self::validate_blob_fields(&fields, &partition_keys, &options)?;
+
PartialUpdateConfig::new(&options).validate_create_mode(!primary_keys.is_empty())?;
Ok(Self {
fields,
@@ -917,6 +919,29 @@ mod tests {
assert_eq!(schema.fields().len(), 2);
}
+ #[test]
+ fn test_partial_update_schema_validation_rejects_unsupported_options() {
+ for (key, value) in [
+ ("ignore-delete", "true"),
+ ("fields.value.sequence-group", "g1"),
+ ("fields.default-aggregate-function", "last_non_null"),
+ ] {
+ let err = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("merge-engine", "partial-update")
+ .option(key, value)
+ .build()
+ .unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::ConfigInvalid { ref message } if
message.contains(key)),
+ "partial-update create-time validation should reject '{key}',
got {err:?}"
+ );
+ }
+ }
+
#[test]
fn test_schema_builder_column_row_type() {
let row_type = RowType::new(vec![DataField::new(
diff --git a/crates/paimon/src/table/bucket_assigner_cross.rs
b/crates/paimon/src/table/bucket_assigner_cross.rs
index e60c9eb..a021761 100644
--- a/crates/paimon/src/table/bucket_assigner_cross.rs
+++ b/crates/paimon/src/table/bucket_assigner_cross.rs
@@ -147,19 +147,19 @@ impl GlobalPartitionIndex {
}
/// Assign a bucket for the given primary key targeting `new_partition`.
- fn assign(&mut self, pk_bytes: &[u8], new_partition: &[u8]) ->
AssignResult {
+ fn assign(&mut self, pk_bytes: &[u8], new_partition: &[u8]) ->
Result<AssignResult> {
if let Some((existing_partition, existing_bucket)) =
self.key_to_location.get(pk_bytes) {
if existing_partition == new_partition {
- return AssignResult::SamePartition {
+ return Ok(AssignResult::SamePartition {
bucket: *existing_bucket,
- };
+ });
}
// Key exists in a different partition
match self.merge_engine {
MergeEngine::FirstRow => {
// FIRST_ROW: keep old data, discard new row
- return AssignResult::Skip;
+ return Ok(AssignResult::Skip);
}
MergeEngine::Deduplicate => {
let old_partition = existing_partition.clone();
@@ -181,11 +181,16 @@ impl GlobalPartitionIndex {
self.key_to_location
.insert(pk_bytes.to_vec(), (new_partition.to_vec(),
new_bucket));
- return AssignResult::CrossPartition {
+ return Ok(AssignResult::CrossPartition {
old_partition,
old_bucket,
new_bucket,
- };
+ });
+ }
+ MergeEngine::PartialUpdate => {
+ return Err(crate::Error::Unsupported {
+ message: "CrossPartitionAssigner does not support
merge-engine=partial-update yet".to_string(),
+ });
}
}
}
@@ -193,7 +198,7 @@ impl GlobalPartitionIndex {
let bucket = self.assign_bucket_in_partition(new_partition);
self.key_to_location
.insert(pk_bytes.to_vec(), (new_partition.to_vec(), bucket));
- AssignResult::SamePartition { bucket }
+ Ok(AssignResult::SamePartition { bucket })
}
fn assign_bucket_in_partition(&mut self, partition: &[u8]) -> i32 {
@@ -309,7 +314,7 @@ impl BucketAssigner for CrossPartitionAssigner {
let mut skips = Vec::new();
for row_idx in 0..num_rows {
- match global_index.assign(&pk_bytes_vec[row_idx],
&partition_bytes_vec[row_idx]) {
+ match global_index.assign(&pk_bytes_vec[row_idx],
&partition_bytes_vec[row_idx])? {
AssignResult::SamePartition { bucket } => {
buckets.push(bucket);
}
diff --git a/crates/paimon/src/table/cow_writer.rs
b/crates/paimon/src/table/cow_writer.rs
index c2b5e95..7006f0f 100644
--- a/crates/paimon/src/table/cow_writer.rs
+++ b/crates/paimon/src/table/cow_writer.rs
@@ -223,6 +223,7 @@ impl CopyOnWriteMergeWriter {
let target_file_size = core_options.target_file_size();
let file_compression = core_options.file_compression().to_string();
let file_compression_zstd_level =
core_options.file_compression_zstd_level();
+ let file_format = core_options.file_format().to_string();
let write_buffer_size = core_options.write_parquet_buffer_size();
let file_format = core_options.file_format().to_string();
let schema_id = schema.id();
diff --git a/crates/paimon/src/table/kv_file_reader.rs
b/crates/paimon/src/table/kv_file_reader.rs
index 1ad0f08..128b2da 100644
--- a/crates/paimon/src/table/kv_file_reader.rs
+++ b/crates/paimon/src/table/kv_file_reader.rs
@@ -24,11 +24,13 @@
//! Reference: Java Paimon `SortMergeReaderWithMinHeap`.
use super::data_file_reader::DataFileReader;
-use super::sort_merge::{DeduplicateMergeFunction, SortMergeReaderBuilder};
+use super::sort_merge::{
+ DeduplicateMergeFunction, PartialUpdateMergeFunction,
SortMergeReaderBuilder,
+};
use crate::arrow::build_target_arrow_schema;
use crate::io::FileIO;
use crate::spec::{
- BigIntType, DataField, DataType as PaimonDataType, Predicate, TinyIntType,
+ BigIntType, DataField, DataType as PaimonDataType, MergeEngine, Predicate,
TinyIntType,
SEQUENCE_NUMBER_FIELD_ID, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_ID,
VALUE_KIND_FIELD_NAME,
};
@@ -39,6 +41,7 @@ use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::StreamExt;
+use std::collections::HashMap;
/// Reads primary-key table data files using sort-merge deduplication.
pub(crate) struct KeyValueFileReader {
@@ -49,12 +52,15 @@ pub(crate) struct KeyValueFileReader {
/// Configuration for [`KeyValueFileReader`], grouping table schema and
/// key/predicate parameters.
pub(crate) struct KeyValueReadConfig {
+ pub table_name: String,
+ pub table_options: HashMap<String, String>,
pub schema_manager: SchemaManager,
pub table_schema_id: i64,
pub table_fields: Vec<DataField>,
pub read_type: Vec<DataField>,
pub predicates: Vec<Predicate>,
pub primary_keys: Vec<String>,
+ pub merge_engine: MergeEngine,
pub sequence_fields: Vec<String>,
}
@@ -92,6 +98,23 @@ impl KeyValueFileReader {
}
}
+ fn new_merge_function(
+ merge_engine: MergeEngine,
+ table_options: &HashMap<String, String>,
+ table_name: &str,
+ ) -> crate::Result<Box<dyn super::sort_merge::MergeFunction>> {
+ match merge_engine {
+ MergeEngine::Deduplicate => Ok(Box::new(DeduplicateMergeFunction)),
+ MergeEngine::PartialUpdate =>
Ok(Box::new(PartialUpdateMergeFunction::new(
+ table_options,
+ table_name,
+ )?)),
+ MergeEngine::FirstRow => Err(Error::Unsupported {
+ message: "KeyValueFileReader does not support
merge-engine=first-row; first-row reads should use the non-KV path".to_string(),
+ }),
+ }
+ }
+
pub fn read(self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
// Build the internal read type for thin-mode files.
// Physical file schema: [_SEQUENCE_NUMBER, _VALUE_KIND,
all_user_cols...]
@@ -234,9 +257,12 @@ impl KeyValueFileReader {
let splits: Vec<DataSplit> = data_splits.to_vec();
let file_io = self.file_io;
+ let merge_engine = self.config.merge_engine;
let schema_manager = self.config.schema_manager;
let table_schema_id = self.config.table_schema_id;
let table_fields = self.config.table_fields;
+ let table_name = self.config.table_name;
+ let table_options = self.config.table_options;
let predicates = self.config.predicates;
// Build the merge output schema (keys + values, no system columns).
@@ -252,9 +278,8 @@ impl KeyValueFileReader {
.data_deletion_files()
.is_some_and(|files| files.iter().any(Option::is_some))
{
- Err(Error::UnexpectedError {
+ Err(Error::Unsupported {
message: "KeyValueFileReader does not support deletion
vectors".to_string(),
- source: None,
})?;
}
@@ -303,7 +328,7 @@ impl KeyValueFileReader {
user_sequence_indices.clone(),
value_indices.clone(),
merge_output_schema.clone(),
- Box::new(DeduplicateMergeFunction),
+ Self::new_merge_function(merge_engine, &table_options,
&table_name)?,
)
.build()?;
diff --git a/crates/paimon/src/table/kv_file_writer.rs
b/crates/paimon/src/table/kv_file_writer.rs
index 2004d6f..6b43445 100644
--- a/crates/paimon/src/table/kv_file_writer.rs
+++ b/crates/paimon/src/table/kv_file_writer.rs
@@ -31,7 +31,7 @@ use crate::io::FileIO;
use crate::spec::stats::{compute_column_stats, BinaryTableStats};
use crate::spec::{
extract_datum_from_arrow, BinaryRowBuilder, DataFileMeta, DataType,
MergeEngine,
- EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_NAME,
+ PartialUpdateConfig, EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME,
VALUE_KIND_FIELD_NAME,
};
use crate::Result;
use arrow_array::{Int64Array, Int8Array, RecordBatch};
@@ -39,6 +39,7 @@ use arrow_ord::sort::{lexsort_to_indices, SortColumn,
SortOptions};
use arrow_row::{RowConverter, SortField};
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as
ArrowSchema};
use chrono::Utc;
+use std::collections::HashMap;
use std::sync::Arc;
/// Internal writer for primary-key tables that buffers data in memory,
@@ -59,6 +60,8 @@ pub(crate) struct KeyValueFileWriter {
/// Configuration for [`KeyValueFileWriter`], grouping file-location, schema,
/// and key/merge parameters.
pub(crate) struct KeyValueWriteConfig {
+ pub table_name: String,
+ pub table_options: HashMap<String, String>,
pub table_location: String,
pub partition_path: String,
pub bucket: i32,
@@ -75,6 +78,8 @@ pub(crate) struct KeyValueWriteConfig {
pub sequence_field_indices: Vec<usize>,
/// Merge engine for deduplication.
pub merge_engine: MergeEngine,
+ pub dynamic_bucket_enabled: bool,
+ pub deletion_vectors_enabled: bool,
}
impl KeyValueFileWriter {
@@ -82,15 +87,38 @@ impl KeyValueFileWriter {
file_io: FileIO,
config: KeyValueWriteConfig,
next_sequence_number: i64,
- ) -> Self {
- Self {
+ ) -> Result<Self> {
+ if config.merge_engine == MergeEngine::PartialUpdate {
+ PartialUpdateConfig::new(&config.table_options)
+ .validate_runtime_mode(true, &config.table_name)?;
+
+ if config.deletion_vectors_enabled {
+ return Err(crate::Error::Unsupported {
+ message: format!(
+ "Table '{}' uses merge-engine=partial-update with
deletion-vectors.enabled=true, which is not supported yet",
+ config.table_name
+ ),
+ });
+ }
+
+ if config.dynamic_bucket_enabled {
+ return Err(crate::Error::Unsupported {
+ message: format!(
+ "Table '{}' uses merge-engine=partial-update with
bucket=-1, which is not supported yet; currently only fixed-bucket
partial-update is supported",
+ config.table_name
+ ),
+ });
+ }
+ }
+
+ Ok(Self {
file_io,
config,
next_sequence_number,
buffer: Vec::new(),
buffer_bytes: 0,
written_files: Vec::new(),
- }
+ })
}
/// Buffer a RecordBatch. Flushes when buffer exceeds write_buffer_size.
@@ -185,16 +213,16 @@ impl KeyValueFileWriter {
source: None,
})?;
- // Deduplicate: for consecutive rows with the same PK, pick the winner.
// After sorting by PK + seq fields + auto-seq (all ascending):
- // Deduplicate → keep last row per key group (highest seq)
- // FirstRow → keep first row per key group (lowest seq)
- let deduped_indices = self.dedup_sorted_indices(&combined,
&sorted_indices)?;
- let deduped_num_rows = deduped_indices.len();
-
- // Extract min_key / max_key from deduped endpoints.
- let first_row = deduped_indices[0] as usize;
- let last_row = deduped_indices[deduped_num_rows - 1] as usize;
+ // Deduplicate → keep last row per key group (highest seq)
+ // FirstRow → keep first row per key group (lowest seq)
+ // PartialUpdate → keep all rows for read-side field-wise merge
+ let selected_indices = self.select_flush_indices(&combined,
&sorted_indices)?;
+ let selected_num_rows = selected_indices.len();
+
+ // Extract min_key / max_key from selected endpoints.
+ let first_row = selected_indices[0] as usize;
+ let last_row = selected_indices[selected_num_rows - 1] as usize;
let min_key = self.extract_key_binary_row(&combined, first_row)?;
let max_key = self.extract_key_binary_row(&combined, last_row)?;
@@ -231,11 +259,11 @@ impl KeyValueFileWriter {
)
.await?;
- // Chunked write using deduped indices.
- let deduped_u32 = arrow_array::UInt32Array::from(deduped_indices);
- for chunk_start in
(0..deduped_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) {
- let chunk_len = Self::FLUSH_CHUNK_ROWS.min(deduped_num_rows -
chunk_start);
- let chunk_indices = deduped_u32.slice(chunk_start, chunk_len);
+ // Chunked write using selected indices.
+ let selected_u32 = arrow_array::UInt32Array::from(selected_indices);
+ for chunk_start in
(0..selected_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) {
+ let chunk_len = Self::FLUSH_CHUNK_ROWS.min(selected_num_rows -
chunk_start);
+ let chunk_indices = selected_u32.slice(chunk_start, chunk_len);
let mut physical_columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::new();
// Sequence numbers for this chunk.
@@ -296,20 +324,20 @@ impl KeyValueFileWriter {
let file_size = writer.close().await? as i64;
- // Compute key_stats on deduped data (not the raw combined batch).
- let deduped_key_columns: Vec<Arc<dyn arrow_array::Array>> =
- self.config
- .primary_key_indices
- .iter()
- .map(|&idx| {
- arrow_select::take::take(combined.column(idx).as_ref(),
&deduped_u32, None)
- .map_err(|e| crate::Error::DataInvalid {
- message: format!("Failed to take key column for
stats: {e}"),
- source: None,
- })
- })
- .collect::<Result<Vec<_>>>()?;
- let deduped_key_batch = RecordBatch::try_new(
+ // Compute key_stats on selected output rows (not the raw combined
batch).
+ let selected_key_columns: Vec<Arc<dyn arrow_array::Array>> = self
+ .config
+ .primary_key_indices
+ .iter()
+ .map(|&idx| {
+ arrow_select::take::take(combined.column(idx).as_ref(),
&selected_u32, None)
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("Failed to take key column for stats:
{e}"),
+ source: None,
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let selected_key_batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(
self.config
.primary_key_indices
@@ -317,15 +345,15 @@ impl KeyValueFileWriter {
.map(|&idx| user_schema.field(idx).clone())
.collect::<Vec<_>>(),
)),
- deduped_key_columns,
+ selected_key_columns,
)
.map_err(|e| crate::Error::DataInvalid {
- message: format!("Failed to build deduped key batch for stats:
{e}"),
+ message: format!("Failed to build selected key batch for stats:
{e}"),
source: None,
})?;
let stats_col_indices: Vec<usize> =
(0..self.config.primary_key_indices.len()).collect();
let key_stats = compute_column_stats(
- &deduped_key_batch,
+ &selected_key_batch,
&stats_col_indices,
&self.config.primary_key_types,
)?;
@@ -334,7 +362,7 @@ impl KeyValueFileWriter {
let meta = DataFileMeta {
file_name,
file_size,
- row_count: deduped_num_rows as i64,
+ row_count: selected_num_rows as i64,
min_key,
max_key,
key_stats,
@@ -361,7 +389,26 @@ impl KeyValueFileWriter {
Ok(())
}
- /// Deduplicate sorted indices by primary key using the configured merge
engine.
+ /// Select output row indices from sorted inputs according to merge engine.
+ ///
+ /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all
ascending).
+ /// Output: row indices to write in sorted PK order.
+ fn select_flush_indices(
+ &self,
+ batch: &RecordBatch,
+ sorted_indices: &arrow_array::UInt32Array,
+ ) -> Result<Vec<u32>> {
+ match self.config.merge_engine {
+ MergeEngine::Deduplicate | MergeEngine::FirstRow => {
+ self.dedup_sorted_indices(batch, sorted_indices)
+ }
+ MergeEngine::PartialUpdate => Ok((0..sorted_indices.len())
+ .map(|idx| sorted_indices.value(idx))
+ .collect()),
+ }
+ }
+
+ /// Deduplicate sorted indices by primary key for Deduplicate / FirstRow
engines.
///
/// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all
ascending).
/// Output: a Vec<u32> of original row indices to keep, in sorted PK order.
@@ -415,6 +462,9 @@ impl KeyValueFileWriter {
MergeEngine::Deduplicate => group_winner = cur,
// FirstRow: keep first (lowest seq), so don't update.
MergeEngine::FirstRow => {}
+ MergeEngine::PartialUpdate => unreachable!(
+ "partial-update should use select_flush_indices and
skip dedup"
+ ),
}
} else {
// New key group — emit the winner of the previous group.
@@ -473,3 +523,154 @@ pub(crate) fn build_physical_schema(user_schema:
&ArrowSchema) -> Arc<ArrowSchem
}
Arc::new(ArrowSchema::new(physical_fields))
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::spec::IntType;
+ use arrow_array::{Int32Array, UInt32Array};
+ use std::collections::HashMap;
+
+ fn test_write_config(merge_engine: MergeEngine) -> KeyValueWriteConfig {
+ let mut table_options = HashMap::new();
+ if merge_engine == MergeEngine::PartialUpdate {
+ table_options.insert("merge-engine".to_string(),
"partial-update".to_string());
+ }
+
+ KeyValueWriteConfig {
+ table_name: "default.test_table".to_string(),
+ table_options,
+ table_location: "memory:/kv-test".to_string(),
+ partition_path: String::new(),
+ bucket: 0,
+ schema_id: 0,
+ file_compression: "none".to_string(),
+ file_compression_zstd_level: 0,
+ write_buffer_size: 1024,
+ file_format: "parquet".to_string(),
+ primary_key_indices: vec![0],
+ primary_key_types: vec![DataType::Int(IntType::new())],
+ sequence_field_indices: vec![1],
+ merge_engine,
+ dynamic_bucket_enabled: false,
+ deletion_vectors_enabled: false,
+ }
+ }
+
+ fn first_row_writer() -> KeyValueFileWriter {
+ KeyValueFileWriter::new(
+ FileIOBuilder::new("memory").build().unwrap(),
+ test_write_config(MergeEngine::FirstRow),
+ 0,
+ )
+ .unwrap()
+ }
+
+ #[test]
+ fn test_dedup_sorted_indices_keeps_first_row_for_first_row_engine() {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)),
+ Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)),
+ Arc::new(ArrowField::new("value", ArrowDataType::Int32, false)),
+ ]));
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as Arc<dyn
arrow_array::Array>,
+ Arc::new(Int64Array::from(vec![10, 20, 5, 6])) as Arc<dyn
arrow_array::Array>,
+ Arc::new(Int32Array::from(vec![100, 200, 300, 400])) as
Arc<dyn arrow_array::Array>,
+ ],
+ )
+ .unwrap();
+ let sorted_indices = UInt32Array::from(vec![0, 1, 2, 3]);
+
+ let deduped = first_row_writer()
+ .dedup_sorted_indices(&batch, &sorted_indices)
+ .unwrap();
+
+ assert_eq!(deduped, vec![0, 2]);
+ }
+
+ #[test]
+ fn test_select_flush_indices_keeps_all_rows_for_partial_update_engine() {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)),
+ Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)),
+ ]));
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(vec![1, 1])) as Arc<dyn
arrow_array::Array>,
+ Arc::new(Int64Array::from(vec![10, 20])) as Arc<dyn
arrow_array::Array>,
+ ],
+ )
+ .unwrap();
+ let sorted_indices = UInt32Array::from(vec![0, 1]);
+ let writer = KeyValueFileWriter::new(
+ FileIOBuilder::new("memory").build().unwrap(),
+ test_write_config(MergeEngine::PartialUpdate),
+ 0,
+ )
+ .unwrap();
+
+ let selected = writer
+ .select_flush_indices(&batch, &sorted_indices)
+ .unwrap();
+
+ assert_eq!(selected, vec![0, 1]);
+ }
+
+ #[test]
+ fn test_new_rejects_partial_update_dynamic_bucket() {
+ let mut config = test_write_config(MergeEngine::PartialUpdate);
+ config.dynamic_bucket_enabled = true;
+
+ let err =
KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config,
0)
+ .err()
+ .unwrap();
+
+ assert!(matches!(
+ err,
+ crate::Error::Unsupported { message } if
message.contains("bucket=-1")
+ ));
+ }
+
+ #[test]
+ fn test_new_rejects_partial_update_with_deletion_vectors() {
+ let mut config = test_write_config(MergeEngine::PartialUpdate);
+ config.deletion_vectors_enabled = true;
+
+ let err =
KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config,
0)
+ .err()
+ .unwrap();
+
+ assert!(matches!(
+ err,
+ crate::Error::Unsupported { message }
+ if message.contains("deletion-vectors.enabled=true")
+ ));
+ }
+
+ #[test]
+ fn test_new_rejects_unsupported_partial_update_options() {
+ let mut config = test_write_config(MergeEngine::PartialUpdate);
+ config.table_options = HashMap::from([
+ ("merge-engine".to_string(), "partial-update".to_string()),
+ (
+ "fields.price.aggregate-function".to_string(),
+ "last_non_null".to_string(),
+ ),
+ ]);
+
+ let err =
KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config,
0)
+ .err()
+ .unwrap();
+
+ assert!(matches!(
+ err,
+ crate::Error::Unsupported { message }
+ if message.contains("fields.price.aggregate-function")
+ ));
+ }
+}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index b53e767..3e7684d 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -342,6 +342,28 @@ mod tests {
)
}
+ fn partial_update_dv_pk_table() -> Table {
+ let file_io = FileIOBuilder::new("file").build().unwrap();
+ let table_schema = TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("merge-engine", "partial-update")
+ .option("deletion-vectors.enabled", "true")
+ .build()
+ .unwrap(),
+ );
+ Table::new(
+ file_io,
+ Identifier::new("default", "partial_update_dv_t"),
+ "/tmp/test-partial-update-dv-read-builder".to_string(),
+ table_schema,
+ None,
+ )
+ }
+
#[test]
fn test_exact_filter_pushdown_is_true_for_partition_only_filter() {
let table = simple_table();
@@ -699,4 +721,35 @@ mod tests {
assert_eq!(ranges[0].from(), 0);
assert_eq!(ranges[0].to(), 5);
}
+
+ #[tokio::test]
+ async fn
test_direct_table_read_rejects_partial_update_with_deletion_vectors() {
+ let table = partial_update_dv_pk_table();
+ let split = DataSplitBuilder::new()
+ .with_snapshot(1)
+ .with_partition(BinaryRow::new(0))
+ .with_bucket(0)
+
.with_bucket_path("/tmp/test-partial-update-dv-read-builder/bucket-0".to_string())
+ .with_total_buckets(1)
+ .with_data_files(vec![test_data_file("data.parquet", 1, 0)])
+
.with_data_deletion_files(vec![Some(crate::table::source::DeletionFile::new(
+
"/tmp/test-partial-update-dv-read-builder/index/dv".to_string(),
+ 0,
+ 0,
+ None,
+ ))])
+ .build()
+ .unwrap();
+ let err = TableRead::new(&table, table.schema().fields().to_vec(),
Vec::new())
+ .to_arrow(&[split])
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::Unsupported { ref message } if
message.contains("deletion vectors")),
+ "expected partial-update+DV read to fail fast with Unsupported,
got {err:?}"
+ );
+ }
}
diff --git a/crates/paimon/src/table/sort_merge.rs
b/crates/paimon/src/table/sort_merge.rs
index b7dcbd2..8a9ece4 100644
--- a/crates/paimon/src/table/sort_merge.rs
+++ b/crates/paimon/src/table/sort_merge.rs
@@ -26,21 +26,47 @@
//! - DataFusion: `SortPreservingMergeStream` (LoserTree layout)
//! - Arrow-row: `RowConverter` for efficient key comparison
-use crate::spec::RowKind;
+use crate::spec::{PartialUpdateConfig, RowKind};
use crate::table::ArrowRecordBatchStream;
use crate::Error;
-use arrow_array::{ArrayRef, Int64Array, Int8Array, RecordBatch};
+use arrow_array::{new_null_array, ArrayRef, Int64Array, Int8Array,
RecordBatch};
use arrow_row::{RowConverter, Rows, SortField};
use arrow_schema::SchemaRef;
use arrow_select::interleave::interleave;
use async_stream::try_stream;
use futures::StreamExt;
use std::cmp::Ordering;
+use std::collections::HashMap;
// ---------------------------------------------------------------------------
// MergeFunction
// ---------------------------------------------------------------------------
+/// Buffered batches used by the merge reader.
+///
+/// Source batches keep the internal read schema, while materialized batches
+/// already match the merge output schema.
+#[derive(Clone)]
+pub(crate) enum BufferedBatch {
+ Source(RecordBatch),
+ Materialized(RecordBatch),
+}
+
+impl BufferedBatch {
+ fn column_for_output<'a>(
+ &'a self,
+ output_col_idx: usize,
+ source_output_col_indices: &[usize],
+ ) -> &'a dyn arrow_array::Array {
+ match self {
+ Self::Source(batch) => batch
+ .column(source_output_col_indices[output_col_idx])
+ .as_ref(),
+ Self::Materialized(batch) => batch.column(output_col_idx).as_ref(),
+ }
+ }
+}
+
/// A row reference as an index into the batch buffer.
pub(crate) struct MergeRow {
/// Index into the shared batch buffer.
@@ -52,15 +78,55 @@ pub(crate) struct MergeRow {
pub user_sequences: Vec<Option<i128>>,
}
+#[cfg(test)]
+impl MergeRow {
+ fn source_batch<'a>(
+ &self,
+ batch_buffer: &'a [BufferedBatch],
+ ) -> crate::Result<&'a RecordBatch> {
+ match batch_buffer.get(self.batch_idx) {
+ Some(BufferedBatch::Source(batch)) => Ok(batch),
+ Some(BufferedBatch::Materialized(_)) => Err(Error::UnexpectedError
{
+ message: format!(
+ "Merge row unexpectedly referenced a materialized batch at
index {}",
+ self.batch_idx
+ ),
+ source: None,
+ }),
+ None => Err(Error::UnexpectedError {
+ message: format!(
+ "Merge row referenced batch index {} outside the current
buffer",
+ self.batch_idx
+ ),
+ source: None,
+ }),
+ }
+ }
+}
+
+/// Merge result for rows sharing the same primary key.
+pub(crate) enum MergeResult {
+ /// Reuse an existing source row from the batch buffer.
+ SourceRow { batch_idx: usize, row_idx: usize },
+ /// Emit a synthesized one-row batch matching the merge output schema.
+ MaterializedRow(RecordBatch),
+ /// Omit this key from the output.
+ Omit,
+}
+
/// Merge function applied to rows sharing the same primary key.
///
-/// For deduplicate: returns the single winner (batch_idx, row_idx), or None
-/// if the winning row should be filtered out (e.g. DELETE).
+/// Deduplicate-style engines can keep returning a source row. Future
+/// field-wise engines may instead materialize a new output row.
pub(crate) trait MergeFunction: Send + Sync {
- /// Pick the winning row from same-key candidates.
- /// Returns `Some((batch_idx, row_idx))` of the winner, or `None` if the
- /// key should be omitted from output (e.g. winner is a DELETE row).
- fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result<Option<(usize,
usize)>>;
+ /// Merge all rows sharing the same key into a final output result.
+ fn merge(
+ &self,
+ rows: &[MergeRow],
+ batch_buffer: &[BufferedBatch],
+ source_output_col_indices: &[usize],
+ output_schema: &SchemaRef,
+ ) -> crate::Result<MergeResult>;
}
/// Deduplicate merge: keeps the row with the highest sequence.
@@ -71,19 +137,28 @@ pub(crate) trait MergeFunction: Send + Sync {
/// Filters out DELETE and UPDATE_BEFORE rows.
pub(crate) struct DeduplicateMergeFunction;
+fn compare_sequence_order(lhs: &MergeRow, rhs: &MergeRow) -> Ordering {
+ match (lhs.user_sequences.is_empty(), rhs.user_sequences.is_empty()) {
+ (false, false) => lhs
+ .user_sequences
+ .cmp(&rhs.user_sequences)
+ .then_with(|| lhs.sequence_number.cmp(&rhs.sequence_number)),
+ _ => lhs.sequence_number.cmp(&rhs.sequence_number),
+ }
+}
+
impl MergeFunction for DeduplicateMergeFunction {
- fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result<Option<(usize,
usize)>> {
+ fn merge(
+ &self,
+ rows: &[MergeRow],
+ _batch_buffer: &[BufferedBatch],
+ _source_output_col_indices: &[usize],
+ _output_schema: &SchemaRef,
+ ) -> crate::Result<MergeResult> {
let winner = rows
.iter()
.reduce(|best, r| {
- // Compare user sequences lexicographically first (if
present), then system sequence.
- let ord = match (r.user_sequences.is_empty(),
best.user_sequences.is_empty()) {
- (false, false) => r
- .user_sequences
- .cmp(&best.user_sequences)
- .then_with(||
r.sequence_number.cmp(&best.sequence_number)),
- _ => r.sequence_number.cmp(&best.sequence_number),
- };
+ let ord = compare_sequence_order(r, best);
// >= semantics: last-writer-wins for equal values.
if ord.is_ge() {
r
@@ -93,13 +168,110 @@ impl MergeFunction for DeduplicateMergeFunction {
})
.expect("merge called with empty rows");
if RowKind::from_value(winner.value_kind)?.is_add() {
- Ok(Some((winner.batch_idx, winner.row_idx)))
+ Ok(MergeResult::SourceRow {
+ batch_idx: winner.batch_idx,
+ row_idx: winner.row_idx,
+ })
} else {
- Ok(None)
+ Ok(MergeResult::Omit)
}
}
}
+/// Basic partial-update merge: for each non-key column, keep the latest
+/// non-null value ordered by user sequence (if configured) then system
sequence.
+///
+/// DELETE / UPDATE_BEFORE rows are treated as unsupported in this mode.
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct PartialUpdateMergeFunction(());
+
+impl PartialUpdateMergeFunction {
+ pub(crate) fn new(
+ table_options: &HashMap<String, String>,
+ table_name: &str,
+ ) -> crate::Result<Self> {
+ PartialUpdateConfig::new(table_options).validate_runtime_mode(true,
table_name)?;
+ Ok(Self(()))
+ }
+}
+
+impl MergeFunction for PartialUpdateMergeFunction {
+ fn merge(
+ &self,
+ rows: &[MergeRow],
+ batch_buffer: &[BufferedBatch],
+ source_output_col_indices: &[usize],
+ output_schema: &SchemaRef,
+ ) -> crate::Result<MergeResult> {
+ if rows.is_empty() {
+ return Err(Error::UnexpectedError {
+ message: "merge called with empty rows".to_string(),
+ source: None,
+ });
+ }
+
+ let mut ordered_row_indices: Vec<usize> = (0..rows.len()).collect();
+ ordered_row_indices.sort_by(|&lhs_idx, &rhs_idx| {
+ compare_sequence_order(&rows[lhs_idx], &rows[rhs_idx])
+ .then_with(|| lhs_idx.cmp(&rhs_idx))
+ });
+
+ let mut latest_non_null_by_col: Vec<Option<(usize, usize)>> =
+ vec![None; output_schema.fields().len()];
+
+ for row_idx in ordered_row_indices {
+ let row = &rows[row_idx];
+ if !RowKind::from_value(row.value_kind)?.is_add() {
+ return Err(crate::Error::Unsupported {
+ message: "merge-engine=partial-update basic mode does not
support DELETE or UPDATE_BEFORE rows".to_string(),
+ });
+ }
+
+ for (output_col_idx, latest_non_null) in
latest_non_null_by_col.iter_mut().enumerate() {
+ let source_array = batch_buffer[row.batch_idx]
+ .column_for_output(output_col_idx,
source_output_col_indices);
+ if !source_array.is_null(row.row_idx) {
+ *latest_non_null = Some((row.batch_idx, row.row_idx));
+ }
+ }
+ }
+
+ let output_columns: Vec<ArrayRef> = output_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(output_col_idx, field)| {
+ Ok(match latest_non_null_by_col[output_col_idx] {
+ Some((batch_idx, row_idx)) => batch_buffer[batch_idx]
+ .column_for_output(output_col_idx,
source_output_col_indices)
+ .slice(row_idx, 1),
+ None => {
+ if !field.is_nullable() {
+ return Err(Error::DataInvalid {
+ message: format!(
+ "merge-engine=partial-update produced NULL
for non-nullable field '{}'",
+ field.name()
+ ),
+ source: None,
+ });
+ }
+ new_null_array(field.data_type(), 1)
+ }
+ })
+ })
+ .collect::<crate::Result<Vec<_>>>()?;
+
+ let batch = RecordBatch::try_new(output_schema.clone(),
output_columns).map_err(|e| {
+ Error::UnexpectedError {
+ message: format!("Failed to build partial-update materialized
row: {e}"),
+ source: Some(Box::new(e)),
+ }
+ })?;
+
+ Ok(MergeResult::MaterializedRow(batch))
+ }
+}
+
// ---------------------------------------------------------------------------
// SortMergeCursor
// ---------------------------------------------------------------------------
@@ -318,7 +490,7 @@ impl SortMergeReaderBuilder {
}
}
- #[allow(dead_code)]
+ #[cfg(test)]
pub(crate) fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
@@ -405,8 +577,9 @@ fn sort_merge_stream(
return Ok(futures::stream::empty().boxed());
}
- // Output column indices: key columns + value columns (skip
_SEQUENCE_NUMBER).
- let output_col_indices: Vec<usize> = key_indices
+ // Output column indices for source batches: key columns + value columns
+ // (skip system columns like _SEQUENCE_NUMBER).
+ let source_output_col_indices: Vec<usize> = key_indices
.iter()
.chain(value_indices.iter())
.copied()
@@ -440,7 +613,7 @@ fn sort_merge_stream(
// Each cursor's current batch gets an entry; when a cursor advances
// to a new batch, the old one stays in the buffer until the output
// batch is flushed.
- let mut batch_buffer: Vec<RecordBatch> = Vec::new();
+ let mut batch_buffer: Vec<BufferedBatch> = Vec::new();
// Map from stream_idx -> current batch_buffer index.
let mut stream_batch_idx: Vec<Option<usize>> = vec![None; num_streams];
@@ -448,7 +621,7 @@ fn sort_merge_stream(
for (i, cursor) in cursors.iter().enumerate() {
if let Some(c) = cursor {
let idx = batch_buffer.len();
- batch_buffer.push(c.batch.clone());
+ batch_buffer.push(BufferedBatch::Source(c.batch.clone()));
stream_batch_idx[i] = Some(idx);
}
}
@@ -508,7 +681,7 @@ fn sort_merge_stream(
if batch.num_rows() > 0 {
let rows = convert_batch_keys(&batch,
&key_indices, &mut row_converter)?;
let buf_idx = batch_buffer.len();
- batch_buffer.push(batch.clone());
+
batch_buffer.push(BufferedBatch::Source(batch.clone()));
stream_batch_idx[current_winner] =
Some(buf_idx);
cursors[current_winner] = Some(SortMergeCursor
{ batch, rows, offset: 0 });
break;
@@ -521,10 +694,36 @@ fn sort_merge_stream(
tree.update(|a, b| compare_cursors(&cursors, a,
b).then_with(|| a.cmp(&b)).is_gt());
}
- // Apply merge function to pick the winner row.
- // Returns None if the winning row is a DELETE/UPDATE_BEFORE —
skip it.
- if let Some((win_batch_idx, win_row_idx)) =
merge_function.pick_winner(&same_key_rows)? {
- output_indices.push((win_batch_idx, win_row_idx));
+ match merge_function.merge(
+ &same_key_rows,
+ &batch_buffer,
+ &source_output_col_indices,
+ &output_schema,
+ )? {
+ MergeResult::SourceRow { batch_idx, row_idx } => {
+ output_indices.push((batch_idx, row_idx));
+ }
+ MergeResult::MaterializedRow(batch) => {
+ if batch.num_rows() != 1 {
+ Err(Error::UnexpectedError {
+ message: format!(
+ "Materialized merge result must contain
exactly one row, got {}",
+ batch.num_rows()
+ ),
+ source: None,
+ })?;
+ }
+ if batch.schema().as_ref() != output_schema.as_ref() {
+ Err(Error::UnexpectedError {
+ message: "Materialized merge result schema does
not match merge output schema".to_string(),
+ source: None,
+ })?;
+ }
+ let batch_idx = batch_buffer.len();
+ batch_buffer.push(BufferedBatch::Materialized(batch));
+ output_indices.push((batch_idx, 0));
+ }
+ MergeResult::Omit => {}
}
// Yield a batch when we've accumulated enough rows.
@@ -532,13 +731,14 @@ fn sort_merge_stream(
let batch = build_output_interleave(
&output_schema,
&batch_buffer,
- &output_col_indices,
+ &source_output_col_indices,
&output_indices,
)?;
output_indices.clear();
- // Compact batch buffer: only keep batches still referenced by
cursors.
- // SAFETY: output_indices was just cleared above, so no stale
references
- // exist into the buffer. The yield below happens after
compaction.
+ // Compact batch buffer after the pending output rows have been
+ // materialized. Source batches still referenced by cursors
stay
+ // alive; materialized batches can be dropped here because they
+ // are referenced only by the flushed output_indices above.
compact_batch_buffer(
&mut batch_buffer,
&mut stream_batch_idx,
@@ -553,7 +753,7 @@ fn sort_merge_stream(
let batch = build_output_interleave(
&output_schema,
&batch_buffer,
- &output_col_indices,
+ &source_output_col_indices,
&output_indices,
)?;
yield batch;
@@ -566,20 +766,18 @@ fn sort_merge_stream(
/// batch buffer in one pass per column.
fn build_output_interleave(
schema: &SchemaRef,
- batch_buffer: &[RecordBatch],
- output_col_indices: &[usize],
+ batch_buffer: &[BufferedBatch],
+ source_output_col_indices: &[usize],
indices: &[(usize, usize)],
) -> crate::Result<RecordBatch> {
- let columns: Vec<ArrayRef> = output_col_indices
- .iter()
- .map(|&col_idx| {
- // Collect all arrays for this column from the batch buffer.
+ let columns: Vec<ArrayRef> = (0..schema.fields().len())
+ .map(|output_col_idx| {
let arrays: Vec<&dyn arrow_array::Array> = batch_buffer
.iter()
- .map(|b| b.column(col_idx).as_ref())
+ .map(|batch| batch.column_for_output(output_col_idx,
source_output_col_indices))
.collect();
interleave(&arrays, indices).map_err(|e| Error::UnexpectedError {
- message: format!("Failed to interleave column {col_idx}: {e}"),
+ message: format!("Failed to interleave output column
{output_col_idx}: {e}"),
source: Some(Box::new(e)),
})
})
@@ -594,7 +792,7 @@ fn build_output_interleave(
/// Compact the batch buffer by removing batches no longer referenced by any
/// cursor, and updating indices accordingly.
fn compact_batch_buffer(
- batch_buffer: &mut Vec<RecordBatch>,
+ batch_buffer: &mut Vec<BufferedBatch>,
stream_batch_idx: &mut [Option<usize>],
cursors: &[Option<SortMergeCursor>],
) {
@@ -610,7 +808,7 @@ fn compact_batch_buffer(
// Build old->new index mapping.
let mut new_indices: Vec<Option<usize>> = vec![None; batch_buffer.len()];
- let mut new_buffer: Vec<RecordBatch> = Vec::new();
+ let mut new_buffer: Vec<BufferedBatch> = Vec::new();
for (old_idx, is_alive) in alive.iter().enumerate() {
if *is_alive {
new_indices[old_idx] = Some(new_buffer.len());
@@ -642,6 +840,7 @@ mod tests {
use arrow_array::{Array, Int32Array, Int64Array, Int8Array, StringArray};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
+ use std::collections::HashMap;
use std::sync::Arc;
fn make_schema() -> SchemaRef {
@@ -693,6 +892,41 @@ mod tests {
futures::stream::iter(batches.into_iter().map(Ok)).boxed()
}
+ struct MaterializingMergeFunction;
+
+ impl MergeFunction for MaterializingMergeFunction {
+ fn merge(
+ &self,
+ rows: &[MergeRow],
+ batch_buffer: &[BufferedBatch],
+ source_output_col_indices: &[usize],
+ output_schema: &SchemaRef,
+ ) -> crate::Result<MergeResult> {
+ let first = rows.first().expect("merge called with empty rows");
+ let source_batch = first.source_batch(batch_buffer)?;
+ let pk = source_batch
+ .column(source_output_col_indices[0])
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("pk column must be Int32")
+ .value(first.row_idx);
+
+ let batch = RecordBatch::try_new(
+ output_schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![pk])) as ArrayRef,
+ Arc::new(StringArray::from(vec![Some("merged")])) as
ArrayRef,
+ ],
+ )
+ .map_err(|e| Error::UnexpectedError {
+ message: format!("Failed to build materialized merge batch:
{e}"),
+ source: Some(Box::new(e)),
+ })?;
+
+ Ok(MergeResult::MaterializedRow(batch))
+ }
+ }
+
#[tokio::test]
async fn test_loser_tree_basic() {
// 3 streams, verify init produces correct winner
@@ -1264,4 +1498,225 @@ mod tests {
assert_eq!(pks, vec![1, 2, 3, 4]);
assert_eq!(values, vec!["a", "b", "c", "d"]);
}
+
+ #[tokio::test]
+ async fn test_materialized_merge_result_path() {
+ let schema = make_schema();
+ let s0 = stream_from_batches(vec![make_batch(
+ &schema,
+ vec![1, 2],
+ vec![1, 1],
+ vec![Some("old_a"), Some("old_b")],
+ )]);
+ let s1 = stream_from_batches(vec![make_batch(
+ &schema,
+ vec![1, 3],
+ vec![2, 1],
+ vec![Some("new_a"), Some("c")],
+ )]);
+
+ let result = SortMergeReaderBuilder::new(
+ vec![s0, s1],
+ schema,
+ vec![0],
+ 1,
+ 2,
+ vec![],
+ vec![3],
+ make_output_schema(),
+ Box::new(MaterializingMergeFunction),
+ )
+ .build()
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ let pks: Vec<i32> = result
+ .iter()
+ .flat_map(|b| {
+ b.column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect();
+ let values: Vec<String> = result
+ .iter()
+ .flat_map(|b| {
+ let arr =
b.column(1).as_any().downcast_ref::<StringArray>().unwrap();
+ (0..arr.len())
+ .map(|i| arr.value(i).to_string())
+ .collect::<Vec<_>>()
+ })
+ .collect();
+
+ assert_eq!(pks, vec![1, 2, 3]);
+ assert_eq!(values, vec!["merged", "merged", "merged"]);
+ }
+
+ #[tokio::test]
+ async fn test_partial_update_merge_keeps_latest_non_null_values() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("pk", DataType::Int32, false),
+ Field::new("_SEQUENCE_NUMBER", DataType::Int64, false),
+ Field::new("_VALUE_KIND", DataType::Int8, false),
+ Field::new("v_int", DataType::Int32, true),
+ Field::new("v_str", DataType::Utf8, true),
+ ]));
+ let output_schema = Arc::new(Schema::new(vec![
+ Field::new("pk", DataType::Int32, false),
+ Field::new("v_int", DataType::Int32, true),
+ Field::new("v_str", DataType::Utf8, true),
+ ]));
+
+ let s0 = stream_from_batches(vec![RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2])),
+ Arc::new(Int64Array::from(vec![1, 1])),
+ Arc::new(Int8Array::from(vec![0, 0])),
+ Arc::new(Int32Array::from(vec![10, 20])),
+ Arc::new(StringArray::from(vec![Some("old-1"),
Some("old-2")])),
+ ],
+ )
+ .unwrap()]);
+ let s1 = stream_from_batches(vec![RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(Int64Array::from(vec![2, 2, 1])),
+ Arc::new(Int8Array::from(vec![0, 0, 0])),
+ Arc::new(Int32Array::from(vec![None, Some(200), Some(30)])),
+ Arc::new(StringArray::from(vec![Some("new-1"), None, None])),
+ ],
+ )
+ .unwrap()]);
+
+ let result = SortMergeReaderBuilder::new(
+ vec![s0, s1],
+ schema,
+ vec![0],
+ 1,
+ 2,
+ vec![],
+ vec![3, 4],
+ output_schema,
+ Box::new(PartialUpdateMergeFunction::new(&HashMap::new(),
"test_table").unwrap()),
+ )
+ .build()
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ let mut rows: Vec<(i32, Option<i32>, Option<String>)> = Vec::new();
+ for batch in &result {
+ let ids = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let ints = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let strs = batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ for i in 0..batch.num_rows() {
+ rows.push((
+ ids.value(i),
+ if ints.is_null(i) {
+ None
+ } else {
+ Some(ints.value(i))
+ },
+ if strs.is_null(i) {
+ None
+ } else {
+ Some(strs.value(i).to_string())
+ },
+ ));
+ }
+ }
+ rows.sort_by_key(|row| row.0);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, Some(10), Some("new-1".to_string())),
+ (2, Some(200), Some("old-2".to_string())),
+ (3, Some(30), None),
+ ]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_partial_update_merge_rejects_delete_like_rows() {
+ let schema = make_schema();
+ let output_schema = make_output_schema();
+ let s0 = stream_from_batches(vec![make_batch_with_kind(
+ &schema,
+ vec![1],
+ vec![1],
+ vec![0],
+ vec![Some("old")],
+ )]);
+ let s1 = stream_from_batches(vec![make_batch_with_kind(
+ &schema,
+ vec![1],
+ vec![2],
+ vec![3],
+ vec![Some("delete")],
+ )]);
+
+ let err = SortMergeReaderBuilder::new(
+ vec![s0, s1],
+ schema,
+ vec![0],
+ 1,
+ 2,
+ vec![],
+ vec![3],
+ output_schema,
+ Box::new(PartialUpdateMergeFunction::new(&HashMap::new(),
"test_table").unwrap()),
+ )
+ .build()
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap_err();
+
+ assert!(matches!(
+ err,
+ Error::Unsupported { message }
+ if message.contains("partial-update basic mode does not support
DELETE or UPDATE_BEFORE")
+ ));
+ }
+
+ #[test]
+ fn test_partial_update_merge_function_new_rejects_unsupported_options() {
+ let options = HashMap::from([
+ ("merge-engine".to_string(), "partial-update".to_string()),
+ (
+ "fields.price.aggregate-function".to_string(),
+ "last_non_null".to_string(),
+ ),
+ ]);
+
+ let err = PartialUpdateMergeFunction::new(&options,
"default.t").unwrap_err();
+
+ assert!(matches!(
+ err,
+ Error::Unsupported { message }
+ if message.contains("fields.price.aggregate-function")
+ ));
+ }
}
diff --git a/crates/paimon/src/table/table_read.rs
b/crates/paimon/src/table/table_read.rs
index 078f942..5493938 100644
--- a/crates/paimon/src/table/table_read.rs
+++ b/crates/paimon/src/table/table_read.rs
@@ -21,7 +21,7 @@ use super::kv_file_reader::{KeyValueFileReader,
KeyValueReadConfig};
use super::read_builder::split_scan_predicates;
use super::{ArrowRecordBatchStream, Table};
use crate::arrow::filtering::reader_pruning_predicates;
-use crate::spec::{CoreOptions, DataField, Predicate};
+use crate::spec::{CoreOptions, DataField, MergeEngine, Predicate};
use crate::DataSplit;
/// Table read: reads data from splits (e.g. produced by [TableScan::plan]).
@@ -74,17 +74,18 @@ impl<'a> TableRead<'a> {
pub fn to_arrow(&self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
let has_primary_keys = !self.table.schema.primary_keys().is_empty();
let core_options = CoreOptions::new(self.table.schema.options());
+ let merge_engine = core_options.merge_engine()?;
// PK table with Deduplicate engine: splits containing level-0 files
// need KeyValueFileReader for sort-merge dedup; splits with only
// compacted files (level > 0) can use the faster DataFileReader.
- // FirstRow engine falls through — scan already skips level-0.
if has_primary_keys
- && core_options
- .merge_engine()
- .is_ok_and(|e| e == crate::spec::MergeEngine::Deduplicate)
+ && matches!(
+ merge_engine,
+ MergeEngine::Deduplicate | MergeEngine::PartialUpdate
+ )
{
- return self.read_pk_deduplicate(data_splits, &core_options);
+ return self.read_pk(data_splits, &core_options);
}
if core_options.data_evolution_enabled() {
@@ -96,11 +97,15 @@ impl<'a> TableRead<'a> {
/// Read PK table with Deduplicate engine: level-0 splits go through
/// KeyValueFileReader for sort-merge dedup, compacted splits use
DataFileReader.
- fn read_pk_deduplicate(
+ fn read_pk(
&self,
data_splits: &[DataSplit],
core_options: &CoreOptions,
) -> crate::Result<ArrowRecordBatchStream> {
+ if core_options.merge_engine()? == MergeEngine::PartialUpdate {
+ return self.read_kv(data_splits, core_options);
+ }
+
let mut kv_splits = Vec::new();
let mut raw_splits = Vec::new();
for split in data_splits {
@@ -134,12 +139,15 @@ impl<'a> TableRead<'a> {
let reader = KeyValueFileReader::new(
self.table.file_io.clone(),
KeyValueReadConfig {
+ table_name: self.table.identifier().full_name(),
+ table_options: self.table.schema().options().clone(),
schema_manager: self.table.schema_manager().clone(),
table_schema_id: self.table.schema().id(),
table_fields: self.table.schema.fields().to_vec(),
read_type: self.read_type().to_vec(),
predicates: self.data_predicates.clone(),
primary_keys: self.table.schema.trimmed_primary_keys(),
+ merge_engine: core_options.merge_engine()?,
sequence_fields: core_options
.sequence_fields()
.iter()
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index a5ff103..be0d2df 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -284,6 +284,22 @@ pub(super) fn can_push_down_limit_hint_for_scan(
data_predicates.is_empty() && row_ranges.is_none()
}
+fn should_skip_level_zero_for_scan(
+ scan_all_files: bool,
+ has_primary_keys: bool,
+ deletion_vectors_enabled: bool,
+ merge_engine: crate::Result<crate::spec::MergeEngine>,
+) -> bool {
+ if scan_all_files {
+ return false;
+ }
+ if !has_primary_keys {
+ return false;
+ }
+
+ deletion_vectors_enabled || merge_engine.is_ok_and(|e| e ==
crate::spec::MergeEngine::FirstRow)
+}
+
/// TableScan for full table scan (no incremental, no predicate).
///
/// Reference:
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
@@ -474,16 +490,12 @@ impl<'a> TableScan<'a> {
//
// Non-read paths (overwrite, truncate, writer restore) set
scan_all_files=true
// to see all files including level-0, matching Java's CommitScanner
behavior.
- let skip_level_zero = if self.scan_all_files {
- false
- } else if has_primary_keys {
- deletion_vectors_enabled
- || core_options
- .merge_engine()
- .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow)
- } else {
- false
- };
+ let skip_level_zero = should_skip_level_zero_for_scan(
+ self.scan_all_files,
+ has_primary_keys,
+ deletion_vectors_enabled,
+ core_options.merge_engine(),
+ );
let partition_fields = self.table.schema().partition_fields();
@@ -768,7 +780,7 @@ impl<'a> TableScan<'a> {
#[cfg(test)]
mod tests {
- use super::TableScan;
+ use super::{should_skip_level_zero_for_scan, TableScan};
use crate::catalog::Identifier;
use crate::io::FileIOBuilder;
use crate::spec::{
@@ -986,6 +998,26 @@ mod tests {
);
}
+ #[test]
+ fn test_first_row_skips_level_zero_by_default() {
+ assert!(should_skip_level_zero_for_scan(
+ false,
+ true,
+ false,
+ Ok(crate::spec::MergeEngine::FirstRow),
+ ));
+ }
+
+ #[test]
+ fn test_scan_all_files_disables_first_row_level_zero_skip() {
+ assert!(!should_skip_level_zero_for_scan(
+ true,
+ true,
+ false,
+ Ok(crate::spec::MergeEngine::FirstRow),
+ ));
+ }
+
#[test]
fn test_partition_filter_decode_failure_fails_open() {
let fields = partition_string_field();
diff --git a/crates/paimon/src/table/table_write.rs
b/crates/paimon/src/table/table_write.rs
index f817b75..b89a1c9 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -662,6 +662,8 @@ impl TableWrite {
Ok(FileWriter::KeyValue(KeyValueFileWriter::new(
self.table.file_io().clone(),
KeyValueWriteConfig {
+ table_name: self.table.identifier().full_name(),
+ table_options: self.table.schema().options().clone(),
table_location: self.table.location().to_string(),
partition_path,
bucket,
@@ -674,9 +676,15 @@ impl TableWrite {
primary_key_types: self.primary_key_types.clone(),
sequence_field_indices: self.sequence_field_indices.clone(),
merge_engine: self.merge_engine,
+ dynamic_bucket_enabled: matches!(
+ self.bucket_assigner,
+ BucketAssignerEnum::Dynamic(_) |
BucketAssignerEnum::CrossPartition(_)
+ ),
+ deletion_vectors_enabled:
CoreOptions::new(self.table.schema().options())
+ .deletion_vectors_enabled(),
},
next_seq,
- )))
+ )?))
}
}
@@ -898,6 +906,97 @@ mod tests {
assert_eq!(snapshot.id(), 1);
}
+ #[test]
+ fn test_allows_partial_update_fixed_bucket_table() {
+ let table = Table::new(
+ test_file_io(),
+ Identifier::new("default", "test_partial_update_table"),
+ "memory:/test_partial_update_table".to_string(),
+ TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("bucket", "1")
+ .option("merge-engine", "partial-update")
+ .build()
+ .unwrap(),
+ ),
+ None,
+ );
+
+ TableWrite::new(&table, "test-user".to_string(), false).unwrap();
+ }
+
+ #[tokio::test]
+ async fn
test_rejects_partial_update_dynamic_bucket_table_when_creating_writer() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_partial_update_dynamic_bucket_table";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io,
+ Identifier::new("default",
"test_partial_update_dynamic_bucket_table"),
+ table_path.to_string(),
+ TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("merge-engine", "partial-update")
+ .build()
+ .unwrap(),
+ ),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table, "test-user".to_string(),
false).unwrap();
+ let err = table_write
+ .write_arrow_batch(&make_batch(vec![1], vec![10]))
+ .await
+ .unwrap_err();
+ assert!(
+ matches!(err, crate::Error::Unsupported { message } if
message.contains("bucket=-1"))
+ );
+ }
+
+ #[tokio::test]
+ async fn
test_rejects_partial_update_with_deletion_vectors_when_creating_writer() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_partial_update_dv_table";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "test_partial_update_dv_table"),
+ table_path.to_string(),
+ TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("bucket", "1")
+ .option("merge-engine", "partial-update")
+ .option("deletion-vectors.enabled", "true")
+ .build()
+ .unwrap(),
+ ),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table, "test-user".to_string(),
false).unwrap();
+ let err = table_write
+ .write_arrow_batch(&make_batch(vec![1], vec![10]))
+ .await
+ .unwrap_err();
+ assert!(
+ matches!(err, crate::Error::Unsupported { message } if
message.contains("deletion-vectors.enabled=true"))
+ );
+ }
+
#[tokio::test]
async fn test_write_partitioned() {
let file_io = test_file_io();