This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4e0cfb4 feat: add write builder configuration (#367)
4e0cfb4 is described below
commit 4e0cfb4e00547db6158772d25d0eb3a514fa11e6
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 10 09:50:11 2026 +0800
feat: add write builder configuration (#367)
---
.../datafusion/src/physical_plan/sink.rs | 9 +-
crates/integrations/datafusion/src/sql_context.rs | 7 +-
crates/paimon/src/table/write_builder.rs | 236 ++++++++++++++++++++-
3 files changed, 242 insertions(+), 10 deletions(-)
diff --git a/crates/integrations/datafusion/src/physical_plan/sink.rs
b/crates/integrations/datafusion/src/physical_plan/sink.rs
index 07b5acc..4f8ce43 100644
--- a/crates/integrations/datafusion/src/physical_plan/sink.rs
+++ b/crates/integrations/datafusion/src/physical_plan/sink.rs
@@ -80,11 +80,12 @@ impl DataSink for PaimonDataSink {
mut data: SendableRecordBatchStream,
_context: &Arc<TaskContext>,
) -> DFResult<u64> {
- let wb = self.table.new_write_builder();
+ let wb = if self.overwrite {
+ self.table.new_write_builder().with_overwrite()
+ } else {
+ self.table.new_write_builder()
+ };
let mut tw = wb.new_write().map_err(to_datafusion_error)?;
- if self.overwrite {
- tw = tw.with_overwrite();
- }
let mut row_count = 0u64;
while let Some(batch) = data.next().await {
diff --git a/crates/integrations/datafusion/src/sql_context.rs
b/crates/integrations/datafusion/src/sql_context.rs
index 96790de..62e2b7b 100644
--- a/crates/integrations/datafusion/src/sql_context.rs
+++ b/crates/integrations/datafusion/src/sql_context.rs
@@ -1090,11 +1090,8 @@ impl SQLContext {
let mut stream = df.execute_stream().await?;
- let wb = table.new_write_builder();
- let mut tw = wb
- .new_write()
- .map_err(to_datafusion_error)?
- .with_overwrite();
+ let wb = table.new_write_builder().with_overwrite();
+ let mut tw = wb.new_write().map_err(to_datafusion_error)?;
let mut row_count = 0u64;
while let Some(batch_result) = stream.next().await {
diff --git a/crates/paimon/src/table/write_builder.rs
b/crates/paimon/src/table/write_builder.rs
index 9f238d6..d24ba28 100644
--- a/crates/paimon/src/table/write_builder.rs
+++ b/crates/paimon/src/table/write_builder.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
pub struct WriteBuilder<'a> {
table: &'a Table,
commit_user: String,
+ overwrite: bool,
}
impl<'a> WriteBuilder<'a> {
@@ -36,9 +37,39 @@ impl<'a> WriteBuilder<'a> {
Self {
table,
commit_user: Uuid::new_v4().to_string(),
+ overwrite: false,
}
}
+ /// Get the commit user shared by writers and committers created by this
builder.
+ ///
+ /// This value is persisted in snapshot metadata and used for duplicate
+ /// commit detection.
+ pub fn commit_user(&self) -> &str {
+ &self.commit_user
+ }
+
+ /// Set the commit user shared by writers and committers created by this
builder.
+ ///
+ /// This value is persisted in snapshot metadata, used for duplicate commit
+ /// detection, and embedded in postpone-bucket data file name prefixes. It
+ /// should identify a unique commit attempt or job instance, and must be a
+ /// safe file name segment.
+ pub fn with_commit_user(mut self, commit_user: impl Into<String>) ->
crate::Result<Self> {
+ let commit_user = commit_user.into();
+ validate_commit_user(&commit_user)?;
+ self.commit_user = commit_user;
+ Ok(self)
+ }
+
+ /// Mark writers created by this builder as overwrite-aware.
+ ///
+ /// The commit kind remains explicit at the commit call site.
+ pub fn with_overwrite(mut self) -> Self {
+ self.overwrite = true;
+ self
+ }
+
/// Create a new TableCommit for committing write results.
pub fn new_commit(&self) -> TableCommit {
TableCommit::new(self.table.clone(), self.commit_user.clone())
@@ -49,6 +80,209 @@ impl<'a> WriteBuilder<'a> {
/// For primary-key tables, sequence numbers are lazily scanned per
partition
/// when the first writer for that partition is created.
pub fn new_write(&self) -> crate::Result<TableWrite> {
- TableWrite::new(self.table, self.commit_user.clone())
+ let write = TableWrite::new(self.table, self.commit_user.clone())?;
+ Ok(if self.overwrite {
+ write.with_overwrite()
+ } else {
+ write
+ })
+ }
+}
+
+fn validate_commit_user(commit_user: &str) -> crate::Result<()> {
+ let is_invalid = commit_user.is_empty()
+ || commit_user == "."
+ || commit_user == ".."
+ || commit_user.trim() != commit_user
+ || commit_user
+ .chars()
+ .any(|c| matches!(c, '/' | '\\') || c.is_control());
+
+ if is_invalid {
+ return Err(crate::Error::ConfigInvalid {
+ message: "commit_user must be a safe file name
segment".to_string(),
+ });
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::catalog::Identifier;
+ use crate::io::{FileIO, FileIOBuilder};
+ use crate::spec::{CommitKind, DataType, IntType, Schema, TableSchema,
POSTPONE_BUCKET};
+ use arrow_array::{Int32Array, RecordBatch};
+ use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema
as ArrowSchema};
+ use std::sync::Arc;
+
+ fn test_file_io() -> FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+ }
+
+ async fn setup_dirs(file_io: &FileIO, table_path: &str) {
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+ }
+
+ fn make_batch(ids: Vec<i32>, values: Vec<i32>) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ ],
+ )
+ .unwrap()
+ }
+
+ fn test_postpone_pk_table(file_io: &FileIO, table_path: &str) -> Table {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("bucket", "-2")
+ .build()
+ .unwrap();
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_postpone_table"),
+ table_path.to_string(),
+ TableSchema::new(0, &schema),
+ None,
+ )
+ }
+
+ fn input_changelog_pk_table(file_io: &FileIO, table_path: &str) -> Table {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("bucket", "1")
+ .option("changelog-producer", "input")
+ .build()
+ .unwrap();
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_input_changelog"),
+ table_path.to_string(),
+ TableSchema::new(0, &schema),
+ None,
+ )
+ }
+
+ #[test]
+ fn test_with_commit_user_rejects_invalid_file_name_segments() {
+ let table = test_postpone_pk_table(&test_file_io(),
"memory:/test_invalid_commit_user");
+ for invalid_commit_user in [
+ "",
+ ".",
+ "..",
+ "job/1",
+ "job\\1",
+ " job",
+ "job ",
+ "job\n1",
+ "job\u{7f}",
+ ] {
+ let err = match table
+ .new_write_builder()
+ .with_commit_user(invalid_commit_user)
+ {
+ Ok(_) => panic!("Expected commit_user {invalid_commit_user:?}
to be rejected"),
+ Err(err) => err,
+ };
+ assert!(
+ matches!(err, crate::Error::ConfigInvalid { ref message }
+ if message.contains("commit_user") &&
message.contains("file name segment")),
+ "Expected ConfigInvalid for commit_user
{invalid_commit_user:?}, got: {err:?}"
+ );
+ }
+ }
+
+ #[tokio::test]
+ async fn test_custom_commit_user_is_shared_by_write_and_commit() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_write_builder_commit_user";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_postpone_pk_table(&file_io, table_path);
+ let wb = table
+ .new_write_builder()
+ .with_commit_user("my-commit-user")
+ .unwrap();
+ assert_eq!(wb.commit_user(), "my-commit-user");
+
+ let mut write = wb.new_write().unwrap();
+ write
+ .write_arrow_batch(&make_batch(vec![3, 1, 2], vec![30, 10, 20]))
+ .await
+ .unwrap();
+
+ let messages = write.prepare_commit().await.unwrap();
+ assert_eq!(messages[0].bucket, POSTPONE_BUCKET);
+ assert!(
+ messages[0].new_files[0]
+ .file_name
+ .starts_with("data-u-my-commit-user-s-"),
+ "Expected custom commit user in file name, got: {}",
+ messages[0].new_files[0].file_name
+ );
+
+ wb.new_commit().commit(messages).await.unwrap();
+
+ let snapshot_manager =
+ crate::table::SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot = snapshot_manager
+ .get_latest_snapshot()
+ .await
+ .unwrap()
+ .unwrap();
+ assert_eq!(snapshot.commit_user(), "my-commit-user");
+ }
+
+ #[tokio::test]
+ async fn test_with_overwrite_marks_new_write_as_overwrite_aware() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_write_builder_overwrite";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = input_changelog_pk_table(&file_io, table_path);
+ let wb = table.new_write_builder().with_overwrite();
+ let mut write = wb.new_write().unwrap();
+ write
+ .write_arrow_batch(&make_batch(vec![1], vec![10]))
+ .await
+ .unwrap();
+
+ let messages = write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ assert_eq!(messages[0].new_files.len(), 1);
+ assert!(
+ messages[0].new_changelog_files.is_empty(),
+ "Overwrite-aware writer must not produce input changelog files"
+ );
+
+ wb.new_commit().commit(messages).await.unwrap();
+
+ let snapshot_manager =
+ crate::table::SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot = snapshot_manager
+ .get_latest_snapshot()
+ .await
+ .unwrap()
+ .unwrap();
+ assert_eq!(snapshot.commit_kind(), &CommitKind::APPEND);
}
}