This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ecbf458 feat(write): add write pipeline with DataFusion INSERT
INTO/OVERWRITE support (#234)
ecbf458 is described below
commit ecbf45864177544733818737add939a5f532a58c
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Apr 12 19:39:24 2026 +0800
feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE
support (#234)
Add TableWrite for writing Arrow RecordBatches to Paimon append-only
tables. Each (partition, bucket) pair gets its own DataFileWriter with
direct writes (matching delta-rs DeltaWriter pattern). File rolling
uses tokio::spawn for background close, and prepare_commit uses
try_join_all for parallel finalization across partition writers.
Key components:
- TableWrite: routes batches by partition/bucket, holds DataFileWriters
- DataFileWriter: manages parquet file lifecycle with rolling support
- WriteBuilder: creates TableWrite and TableCommit instances
- PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
- FormatFileWriter: extended with flush() and in_progress_size()
Configurable options via CoreOptions:
- file.compression (default: zstd)
- target-file-size (default: 256MB)
- write.parquet-buffer-size (default: 256MB)
Includes E2E integration tests for unpartitioned, partitioned,
fixed-bucket, multi-commit, column projection, and bucket filtering.
---
Cargo.toml | 2 +
crates/integration_tests/Cargo.toml | 1 +
crates/integration_tests/tests/append_tables.rs | 614 +++++++++++++
crates/integrations/datafusion/src/lib.rs | 2 +-
.../datafusion/src/physical_plan/mod.rs | 2 +
.../datafusion/src/physical_plan/sink.rs | 109 +++
crates/integrations/datafusion/src/table/mod.rs | 271 +++++-
crates/paimon/Cargo.toml | 4 +-
crates/paimon/src/arrow/format/mod.rs | 47 +-
crates/paimon/src/arrow/format/parquet.rs | 173 +++-
crates/paimon/src/io/file_io.rs | 20 +-
crates/paimon/src/lib.rs | 3 +-
crates/paimon/src/spec/binary_row.rs | 244 +++--
crates/paimon/src/spec/core_options.rs | 61 +-
crates/paimon/src/spec/partition_utils.rs | 1 +
crates/paimon/src/spec/schema.rs | 20 +
crates/paimon/src/table/bucket_filter.rs | 26 +-
crates/paimon/src/table/mod.rs | 2 +
crates/paimon/src/table/table_commit.rs | 209 +++--
crates/paimon/src/table/table_scan.rs | 60 ++
crates/paimon/src/table/table_write.rs | 998 +++++++++++++++++++++
crates/paimon/src/table/write_builder.rs | 24 +-
22 files changed, 2731 insertions(+), 162 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 5677a5e..83fa44b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,9 @@ arrow-buffer = "57.0"
arrow-schema = "57.0"
arrow-cast = "57.0"
arrow-ord = "57.0"
+arrow-select = "57.0"
datafusion = "52.3.0"
datafusion-ffi = "52.3.0"
parquet = "57.0"
tokio = "1.39.2"
+tokio-util = "0.7"
diff --git a/crates/integration_tests/Cargo.toml
b/crates/integration_tests/Cargo.toml
index 092ad94..7c60a53 100644
--- a/crates/integration_tests/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -26,6 +26,7 @@ homepage.workspace = true
[dependencies]
paimon = { path = "../paimon" }
arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"
diff --git a/crates/integration_tests/tests/append_tables.rs
b/crates/integration_tests/tests/append_tables.rs
new file mode 100644
index 0000000..b2185de
--- /dev/null
+++ b/crates/integration_tests/tests/append_tables.rs
@@ -0,0 +1,614 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! E2E integration tests for append-only (no primary key) tables.
+//!
+//! Covers: unpartitioned, partitioned, bucket=-1, fixed bucket,
+//! multiple commits, column projection, and bucket predicate filtering.
+
+use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
+use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as
ArrowSchema};
+use futures::TryStreamExt;
+use paimon::catalog::Identifier;
+use paimon::io::FileIOBuilder;
+use paimon::spec::{DataType, IntType, Schema, TableSchema, VarCharType};
+use paimon::table::Table;
+use std::sync::Arc;
+
+// ---------------------------------------------------------------------------
+// Helpers
+// ---------------------------------------------------------------------------
+
+fn memory_file_io() -> paimon::io::FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+}
+
+async fn setup_dirs(file_io: &paimon::io::FileIO, table_path: &str) {
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+}
+
+fn make_table(file_io: &paimon::io::FileIO, table_path: &str, schema:
TableSchema) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test"),
+ table_path.to_string(),
+ schema,
+ None,
+ )
+}
+
+fn int_batch(ids: Vec<i32>, values: Vec<i32>) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ ],
+ )
+ .unwrap()
+}
+
+fn partitioned_batch(pts: Vec<&str>, ids: Vec<i32>) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("pt", ArrowDataType::Utf8, false),
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(StringArray::from(pts)),
+ Arc::new(Int32Array::from(ids)),
+ ],
+ )
+ .unwrap()
+}
+
+fn collect_int_col(batches: &[RecordBatch], col: &str) -> Vec<i32> {
+ let mut vals: Vec<i32> = batches
+ .iter()
+ .flat_map(|b| {
+ let idx = b.schema().index_of(col).unwrap();
+ b.column(idx)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .values()
+ .to_vec()
+ })
+ .collect();
+ vals.sort();
+ vals
+}
+
+fn collect_string_col(batches: &[RecordBatch], col: &str) -> Vec<String> {
+ let mut vals: Vec<String> = batches
+ .iter()
+ .flat_map(|b| {
+ let idx = b.schema().index_of(col).unwrap();
+ let arr = b
+ .column(idx)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ (0..arr.len())
+ .map(|i| arr.value(i).to_string())
+ .collect::<Vec<_>>()
+ })
+ .collect();
+ vals.sort();
+ vals
+}
+
+/// Write batches → commit → scan → read, return all batches.
+async fn write_commit_read(table: &Table, batches: Vec<RecordBatch>) ->
Vec<RecordBatch> {
+ let wb = table.new_write_builder();
+ let mut tw = wb.new_write().unwrap();
+ for batch in &batches {
+ tw.write_arrow_batch(batch).await.unwrap();
+ }
+ wb.new_commit()
+ .commit(tw.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ let rb = table.new_read_builder();
+ let plan = rb.new_scan().plan().await.unwrap();
+ let read = rb.new_read().unwrap();
+ read.to_arrow(plan.splits())
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap()
+}
+
+// ---------------------------------------------------------------------------
+// Unpartitioned, bucket = -1 (default)
+// ---------------------------------------------------------------------------
+
+fn unpartitioned_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+}
+
+#[tokio::test]
+async fn test_unpartitioned_single_batch() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_unpart_single";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, unpartitioned_schema());
+
+ let result = write_commit_read(&table, vec![int_batch(vec![1, 2, 3],
vec![10, 20, 30])]).await;
+
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3]);
+ assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30]);
+}
+
+#[tokio::test]
+async fn test_unpartitioned_multiple_batches() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_unpart_multi";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, unpartitioned_schema());
+
+ let result = write_commit_read(
+ &table,
+ vec![
+ int_batch(vec![1, 2], vec![10, 20]),
+ int_batch(vec![3, 4, 5], vec![30, 40, 50]),
+ ],
+ )
+ .await;
+
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4, 5]);
+}
+
+#[tokio::test]
+async fn test_unpartitioned_two_commits() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_unpart_two_commits";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, unpartitioned_schema());
+
+ // First commit
+ let wb = table.new_write_builder();
+ let mut tw = wb.new_write().unwrap();
+ tw.write_arrow_batch(&int_batch(vec![1, 2], vec![10, 20]))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ // Second commit
+ let mut tw2 = wb.new_write().unwrap();
+ tw2.write_arrow_batch(&int_batch(vec![3, 4], vec![30, 40]))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw2.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ // Read all
+ let rb = table.new_read_builder();
+ let plan = rb.new_scan().plan().await.unwrap();
+ let read = rb.new_read().unwrap();
+ let result: Vec<RecordBatch> = read
+ .to_arrow(plan.splits())
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+}
+
+#[tokio::test]
+async fn test_unpartitioned_projection() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_unpart_proj";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, unpartitioned_schema());
+
+ // Write
+ let wb = table.new_write_builder();
+ let mut tw = wb.new_write().unwrap();
+ tw.write_arrow_batch(&int_batch(vec![1, 2, 3], vec![10, 20, 30]))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ // Read with projection
+ let mut rb = table.new_read_builder();
+ rb.with_projection(&["value"]);
+ let plan = rb.new_scan().plan().await.unwrap();
+ let read = rb.new_read().unwrap();
+ let result: Vec<RecordBatch> = read
+ .to_arrow(plan.splits())
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(result[0].schema().fields().len(), 1);
+ assert_eq!(result[0].schema().field(0).name(), "value");
+ assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30]);
+}
+
+// ---------------------------------------------------------------------------
+// Unpartitioned, fixed bucket
+// ---------------------------------------------------------------------------
+
+fn fixed_bucket_schema(buckets: i32) -> TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .option("bucket", buckets.to_string())
+ .option("bucket-key", "id")
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+}
+
+#[tokio::test]
+async fn test_fixed_bucket_write_read() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_fixed_bucket";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, fixed_bucket_schema(4));
+
+ let result = write_commit_read(
+ &table,
+ vec![int_batch(
+ vec![1, 2, 3, 4, 5, 6, 7, 8],
+ vec![10, 20, 30, 40, 50, 60, 70, 80],
+ )],
+ )
+ .await;
+
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4, 5, 6, 7, 8]);
+}
+
+#[tokio::test]
+async fn test_fixed_bucket_scan_filters_by_bucket() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let file_io = memory_file_io();
+ let path = "memory:/append_bucket_filter";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, fixed_bucket_schema(4));
+
+ // Write enough data to spread across buckets
+ let wb = table.new_write_builder();
+ let mut tw = wb.new_write().unwrap();
+ tw.write_arrow_batch(&int_batch(
+ vec![1, 2, 3, 4, 5, 6, 7, 8],
+ vec![10, 20, 30, 40, 50, 60, 70, 80],
+ ))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ // Full scan — should have multiple buckets
+ let full_rb = table.new_read_builder();
+ let full_plan = full_rb.new_scan().plan().await.unwrap();
+ let all_buckets: std::collections::HashSet<i32> =
+ full_plan.splits().iter().map(|s| s.bucket()).collect();
+
+ if all_buckets.len() <= 1 {
+ // All rows hashed to same bucket — can't test filtering
+ return;
+ }
+
+ // Filter by id = 1 — should narrow to one bucket
+ let pb = PredicateBuilder::new(table.schema().fields());
+ let filter = pb.equal("id", Datum::Int(1)).unwrap();
+
+ let mut rb = table.new_read_builder();
+ rb.with_filter(filter);
+ let plan = rb.new_scan().plan().await.unwrap();
+ let filtered_buckets: std::collections::HashSet<i32> =
+ plan.splits().iter().map(|s| s.bucket()).collect();
+
+ assert_eq!(
+ filtered_buckets.len(),
+ 1,
+ "Bucket predicate should narrow to one bucket, got:
{filtered_buckets:?}"
+ );
+ assert!(filtered_buckets.is_subset(&all_buckets));
+
+ // Read and verify id=1 is in the result
+ let read = rb.new_read().unwrap();
+ let result: Vec<RecordBatch> = read
+ .to_arrow(plan.splits())
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap();
+ let ids = collect_int_col(&result, "id");
+ assert!(ids.contains(&1));
+}
+
+// ---------------------------------------------------------------------------
+// Partitioned, bucket = -1
+// ---------------------------------------------------------------------------
+
+fn partitioned_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("pt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .partition_keys(["pt"])
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+}
+
+#[tokio::test]
+async fn test_partitioned_write_read() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_partitioned";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, partitioned_schema());
+
+ let result = write_commit_read(
+ &table,
+ vec![partitioned_batch(
+ vec!["a", "b", "a", "b"],
+ vec![1, 2, 3, 4],
+ )],
+ )
+ .await;
+
+ let total: usize = result.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total, 4);
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+ assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a", "b", "b"]);
+}
+
+#[tokio::test]
+async fn test_partitioned_two_commits() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_part_two_commits";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, partitioned_schema());
+
+ let wb = table.new_write_builder();
+
+ // First commit: partition "a"
+ let mut tw1 = wb.new_write().unwrap();
+ tw1.write_arrow_batch(&partitioned_batch(vec!["a", "a"], vec![1, 2]))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw1.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ // Second commit: partition "b"
+ let mut tw2 = wb.new_write().unwrap();
+ tw2.write_arrow_batch(&partitioned_batch(vec!["b", "b"], vec![3, 4]))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw2.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ // Read all
+ let rb = table.new_read_builder();
+ let plan = rb.new_scan().plan().await.unwrap();
+ let read = rb.new_read().unwrap();
+ let result: Vec<RecordBatch> = read
+ .to_arrow(plan.splits())
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+ assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a", "b", "b"]);
+}
+
+#[tokio::test]
+async fn test_partitioned_scan_partition_filter() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let file_io = memory_file_io();
+ let path = "memory:/append_part_filter";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, partitioned_schema());
+
+ // Write data to two partitions
+ let wb = table.new_write_builder();
+ let mut tw = wb.new_write().unwrap();
+ tw.write_arrow_batch(&partitioned_batch(
+ vec!["a", "b", "a", "b"],
+ vec![1, 2, 3, 4],
+ ))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ // Filter by pt = "a"
+ let pb = PredicateBuilder::new(table.schema().fields());
+ let filter = pb.equal("pt", Datum::String("a".into())).unwrap();
+
+ let mut rb = table.new_read_builder();
+ rb.with_filter(filter);
+ let plan = rb.new_scan().plan().await.unwrap();
+
+ // Only partition "a" splits should survive
+ for split in plan.splits() {
+ let pt = split.partition().get_string(0).unwrap().to_string();
+ assert_eq!(pt, "a");
+ }
+
+ let read = rb.new_read().unwrap();
+ let result: Vec<RecordBatch> = read
+ .to_arrow(plan.splits())
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 3]);
+ assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a"]);
+}
+
+// ---------------------------------------------------------------------------
+// Partitioned + fixed bucket
+// ---------------------------------------------------------------------------
+
+fn partitioned_bucket_schema(buckets: i32) -> TableSchema {
+ let schema = Schema::builder()
+ .column("pt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .partition_keys(["pt"])
+ .option("bucket", buckets.to_string())
+ .option("bucket-key", "id")
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+}
+
+fn partitioned_value_batch(pts: Vec<&str>, ids: Vec<i32>, values: Vec<i32>) ->
RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("pt", ArrowDataType::Utf8, false),
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(StringArray::from(pts)),
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ ],
+ )
+ .unwrap()
+}
+
+#[tokio::test]
+async fn test_partitioned_fixed_bucket_write_read() {
+ let file_io = memory_file_io();
+ let path = "memory:/append_part_bucket";
+ setup_dirs(&file_io, path).await;
+ let table = make_table(&file_io, path, partitioned_bucket_schema(2));
+
+ let wb = table.new_write_builder();
+ let mut tw = wb.new_write().unwrap();
+ tw.write_arrow_batch(&partitioned_value_batch(
+ vec!["a", "a", "b", "b"],
+ vec![1, 2, 3, 4],
+ vec![10, 20, 30, 40],
+ ))
+ .await
+ .unwrap();
+ wb.new_commit()
+ .commit(tw.prepare_commit().await.unwrap())
+ .await
+ .unwrap();
+
+ let rb = table.new_read_builder();
+ let plan = rb.new_scan().plan().await.unwrap();
+ let read = rb.new_read().unwrap();
+ let result: Vec<RecordBatch> = read
+ .to_arrow(plan.splits())
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+ assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30, 40]);
+}
+
+// ---------------------------------------------------------------------------
+// Unsupported: primary key table should be rejected
+// ---------------------------------------------------------------------------
+
+#[tokio::test]
+async fn test_reject_primary_key_table() {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+
+ let file_io = memory_file_io();
+ let path = "memory:/append_reject_pk";
+ let table = make_table(&file_io, path, table_schema);
+
+ let result = table.new_write_builder().new_write();
+ assert!(result.is_err());
+ let err = result.err().unwrap();
+ assert!(
+ matches!(&err, paimon::Error::Unsupported { message } if
message.contains("primary keys")),
+ "Expected Unsupported error for PK table, got: {err:?}"
+ );
+}
+
+#[tokio::test]
+async fn test_reject_fixed_bucket_without_bucket_key() {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .option("bucket", "4")
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+
+ let file_io = memory_file_io();
+ let path = "memory:/append_reject_no_bucket_key";
+ let table = make_table(&file_io, path, table_schema);
+
+ let result = table.new_write_builder().new_write();
+ assert!(result.is_err());
+ let err = result.err().unwrap();
+ assert!(
+ matches!(&err, paimon::Error::Unsupported { message } if
message.contains("bucket-key")),
+ "Expected Unsupported error for missing bucket-key, got: {err:?}"
+ );
+}
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index abcf744..4e9fdb3 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Apache Paimon DataFusion Integration (read-only).
+//! Apache Paimon DataFusion Integration.
//!
//! Register a Paimon table as a DataFusion table provider to query it with
SQL or DataFrame API.
//!
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index 48aa546..2fa35bf 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -16,5 +16,7 @@
// under the License.
pub(crate) mod scan;
+pub(crate) mod sink;
pub use scan::PaimonTableScan;
+pub use sink::PaimonDataSink;
diff --git a/crates/integrations/datafusion/src/physical_plan/sink.rs
b/crates/integrations/datafusion/src/physical_plan/sink.rs
new file mode 100644
index 0000000..a0c5c9b
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/sink.rs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataSink implementation for writing to Paimon tables via DataFusion.
+
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::datasource::sink::DataSink;
+use datafusion::error::Result as DFResult;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::execution::TaskContext;
+use datafusion::physical_plan::DisplayAs;
+use futures::StreamExt;
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+/// DataSink that writes RecordBatches to a Paimon table.
+///
+/// Uses the Paimon write pipeline: `WriteBuilder` → `TableWrite` →
`TableCommit`.
+/// Internal parallelism is handled by `TableWrite` which routes each
+/// (partition, bucket) to its own background tokio task.
+#[derive(Debug)]
+pub struct PaimonDataSink {
+ table: Table,
+ schema: ArrowSchemaRef,
+ overwrite: bool,
+}
+
+impl PaimonDataSink {
+ pub fn new(table: Table, schema: ArrowSchemaRef, overwrite: bool) -> Self {
+ Self {
+ table,
+ schema,
+ overwrite,
+ }
+ }
+}
+
+impl DisplayAs for PaimonDataSink {
+ fn fmt_as(
+ &self,
+ _t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut fmt::Formatter,
+ ) -> fmt::Result {
+ write!(f, "PaimonDataSink: table={}", self.table.identifier())
+ }
+}
+
+#[async_trait]
+impl DataSink for PaimonDataSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> &ArrowSchemaRef {
+ &self.schema
+ }
+
+ async fn write_all(
+ &self,
+ mut data: SendableRecordBatchStream,
+ _context: &Arc<TaskContext>,
+ ) -> DFResult<u64> {
+ let wb = self.table.new_write_builder();
+ let mut tw = wb.new_write().map_err(to_datafusion_error)?;
+ let mut row_count = 0u64;
+
+ while let Some(batch) = data.next().await {
+ let batch = batch?;
+ row_count += batch.num_rows() as u64;
+ tw.write_arrow_batch(&batch)
+ .await
+ .map_err(to_datafusion_error)?;
+ }
+
+ let messages = tw.prepare_commit().await.map_err(to_datafusion_error)?;
+ let commit = wb.new_commit();
+
+ if self.overwrite {
+ commit
+ .overwrite(messages)
+ .await
+ .map_err(to_datafusion_error)?;
+ } else {
+ commit.commit(messages).await.map_err(to_datafusion_error)?;
+ }
+
+ Ok(row_count)
+ }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 65eb07f..275f943 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Paimon table provider for DataFusion (read-only).
+//! Paimon table provider for DataFusion.
use std::any::Any;
use std::sync::Arc;
@@ -23,12 +23,16 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion::arrow::datatypes::{Field, Schema, SchemaRef as ArrowSchemaRef};
use datafusion::catalog::Session;
+use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use paimon::table::Table;
+use crate::physical_plan::PaimonDataSink;
+
use crate::error::to_datafusion_error;
use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown};
use crate::physical_plan::PaimonTableScan;
@@ -178,6 +182,25 @@ impl TableProvider for PaimonTableProvider {
)
}
+ async fn insert_into(
+ &self,
+ _state: &dyn Session,
+ input: Arc<dyn ExecutionPlan>,
+ insert_op: InsertOp,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let overwrite = match insert_op {
+ InsertOp::Append => false,
+ InsertOp::Overwrite => true,
+ other => {
+ return
Err(datafusion::error::DataFusionError::NotImplemented(format!(
+ "{other} is not supported for Paimon tables"
+ )));
+ }
+ };
+ let sink = PaimonDataSink::new(self.table.clone(),
self.schema.clone(), overwrite);
+ Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
+ }
+
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
@@ -373,4 +396,250 @@ mod tests {
assert_eq!(scan.pushed_predicate(), Some(&expected));
}
+
+ #[tokio::test]
+ async fn test_insert_into_and_read_back() {
+ use paimon::io::FileIOBuilder;
+ use paimon::spec::{DataType, IntType, Schema as PaimonSchema,
TableSchema};
+
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let table_path = "memory:/test_df_insert_into";
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+
+ let schema = PaimonSchema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+ let table = paimon::table::Table::new(
+ file_io,
+ Identifier::new("default", "test_insert"),
+ table_path.to_string(),
+ table_schema,
+ None,
+ );
+
+ let provider = PaimonTableProvider::try_new(table).unwrap();
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::new(provider)).unwrap();
+
+ // INSERT INTO
+ let result = ctx
+ .sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // Verify count output
+ let count_array = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::UInt64Array>()
+ .unwrap();
+ assert_eq!(count_array.value(0), 3);
+
+ // Read back
+ let batches = ctx
+ .sql("SELECT id, value FROM t ORDER BY id")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let mut rows = Vec::new();
+ for batch in &batches {
+ let ids = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::Int32Array>()
+ .unwrap();
+ let vals = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::Int32Array>()
+ .unwrap();
+ for i in 0..batch.num_rows() {
+ rows.push((ids.value(i), vals.value(i)));
+ }
+ }
+ assert_eq!(rows, vec![(1, 10), (2, 20), (3, 30)]);
+ }
+
+ #[tokio::test]
+ async fn test_insert_overwrite() {
+ use paimon::io::FileIOBuilder;
+ use paimon::spec::{DataType, IntType, Schema as PaimonSchema,
TableSchema, VarCharType};
+
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let table_path = "memory:/test_df_insert_overwrite";
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+
+ let schema = PaimonSchema::builder()
+ .column("pt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .partition_keys(["pt"])
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+ let table = paimon::table::Table::new(
+ file_io,
+ Identifier::new("default", "test_overwrite"),
+ table_path.to_string(),
+ table_schema,
+ None,
+ );
+
+ let provider = PaimonTableProvider::try_new(table).unwrap();
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::new(provider)).unwrap();
+
+ // Initial INSERT: partition "a" and "b"
+ ctx.sql("INSERT INTO t VALUES ('a', 1), ('a', 2), ('b', 3), ('b', 4)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // INSERT OVERWRITE with only partition "a" data
+ // Should overwrite partition "a" but leave partition "b" intact
+ ctx.sql("INSERT OVERWRITE t VALUES ('a', 10), ('a', 20)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // Read back
+ let batches = ctx
+ .sql("SELECT pt, id FROM t ORDER BY pt, id")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let mut rows = Vec::new();
+ for batch in &batches {
+ let pts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::StringArray>()
+ .unwrap();
+ let ids = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::Int32Array>()
+ .unwrap();
+ for i in 0..batch.num_rows() {
+ rows.push((pts.value(i).to_string(), ids.value(i)));
+ }
+ }
+ // Partition "a" overwritten with new data, partition "b" untouched
+ assert_eq!(
+ rows,
+ vec![
+ ("a".to_string(), 10),
+ ("a".to_string(), 20),
+ ("b".to_string(), 3),
+ ("b".to_string(), 4),
+ ]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_insert_overwrite_unpartitioned() {
+ use paimon::io::FileIOBuilder;
+ use paimon::spec::{DataType, IntType, Schema as PaimonSchema,
TableSchema};
+
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let table_path = "memory:/test_df_insert_overwrite_unpart";
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+
+ let schema = PaimonSchema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+ let table = paimon::table::Table::new(
+ file_io,
+ Identifier::new("default", "test_overwrite_unpart"),
+ table_path.to_string(),
+ table_schema,
+ None,
+ );
+
+ let provider = PaimonTableProvider::try_new(table).unwrap();
+ let ctx = SessionContext::new();
+ ctx.register_table("t", Arc::new(provider)).unwrap();
+
+ // Initial INSERT
+ ctx.sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // INSERT OVERWRITE on unpartitioned table — full table overwrite
+ ctx.sql("INSERT OVERWRITE t VALUES (4, 40), (5, 50)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let batches = ctx
+ .sql("SELECT id, value FROM t ORDER BY id")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let mut rows = Vec::new();
+ for batch in &batches {
+ let ids = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::Int32Array>()
+ .unwrap();
+ let vals = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<datafusion::arrow::array::Int32Array>()
+ .unwrap();
+ for i in 0..batch.num_rows() {
+ rows.push((ids.value(i), vals.value(i)));
+ }
+ }
+ // Old data fully replaced
+ assert_eq!(rows, vec![(4, 40), (5, 50)]);
+ }
}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 10e5fa0..908781f 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -64,8 +64,10 @@ arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
futures = "0.3"
-parquet = { workspace = true, features = ["async", "zstd"] }
+tokio-util = { workspace = true, features = ["compat"] }
+parquet = { workspace = true, features = ["async", "zstd", "lz4", "snap"] }
orc-rust = "0.7.0"
async-stream = "0.3.6"
reqwest = { version = "0.12", features = ["json"] }
diff --git a/crates/paimon/src/arrow/format/mod.rs
b/crates/paimon/src/arrow/format/mod.rs
index 63fd4e0..454e621 100644
--- a/crates/paimon/src/arrow/format/mod.rs
+++ b/crates/paimon/src/arrow/format/mod.rs
@@ -19,10 +19,12 @@ mod avro;
mod orc;
mod parquet;
-use crate::io::FileRead;
+use crate::io::{FileRead, OutputFile};
use crate::spec::{DataField, Predicate};
use crate::table::{ArrowRecordBatchStream, RowRange};
use crate::Error;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
use async_trait::async_trait;
/// Predicates with the file-level field context needed for pushdown.
@@ -58,6 +60,30 @@ pub(crate) trait FormatFileReader: Send + Sync {
) -> crate::Result<ArrowRecordBatchStream>;
}
+/// Format-agnostic file writer that streams Arrow RecordBatches directly to
storage.
+///
+/// Each implementation (Parquet, ORC, ...) handles format-specific encoding.
+/// Usage: create via [`create_format_writer`], call
[`write`](FormatFileWriter::write)
+/// for each batch, then [`close`](FormatFileWriter::close) to finalize the
file.
+#[async_trait]
+pub(crate) trait FormatFileWriter: Send {
+ /// Write a RecordBatch to the underlying storage.
+ async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()>;
+
+ /// Number of bytes written so far (approximate, before close).
+ fn num_bytes(&self) -> usize;
+
+ /// Number of bytes buffered in the current row group (not yet flushed).
+ fn in_progress_size(&self) -> usize;
+
+ /// Flush the current row group to storage without closing the file.
+ async fn flush(&mut self) -> crate::Result<()>;
+
+ /// Flush and close the writer, finalizing the file on storage.
+ /// Returns the total number of bytes written.
+ async fn close(self: Box<Self>) -> crate::Result<u64>;
+}
+
/// Create a format reader based on the file extension.
pub(crate) fn create_format_reader(path: &str) -> crate::Result<Box<dyn
FormatFileReader>> {
if path.to_ascii_lowercase().ends_with(".parquet") {
@@ -74,3 +100,22 @@ pub(crate) fn create_format_reader(path: &str) ->
crate::Result<Box<dyn FormatFi
})
}
}
+
+/// Create a format writer that streams directly to storage.
+pub(crate) async fn create_format_writer(
+ output: &OutputFile,
+ schema: SchemaRef,
+ compression: &str,
+ zstd_level: i32,
+) -> crate::Result<Box<dyn FormatFileWriter>> {
+ let path = output.location();
+ if path.to_ascii_lowercase().ends_with(".parquet") {
+ Ok(Box::new(
+ parquet::ParquetFormatWriter::new(output, schema, compression,
zstd_level).await?,
+ ))
+ } else {
+ Err(Error::Unsupported {
+ message: format!("unsupported write format: expected .parquet,
got: {path}"),
+ })
+ }
+}
diff --git a/crates/paimon/src/arrow/format/parquet.rs
b/crates/paimon/src/arrow/format/parquet.rs
index b0aa0ec..74de8a2 100644
--- a/crates/paimon/src/arrow/format/parquet.rs
+++ b/crates/paimon/src/arrow/format/parquet.rs
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use super::{FilePredicates, FormatFileReader};
+use super::{FilePredicates, FormatFileReader, FormatFileWriter};
use crate::arrow::filtering::{predicates_may_match_with_schema, StatsAccessor};
-use crate::io::FileRead;
+use crate::io::{FileRead, OutputFile};
use crate::spec::{DataField, DataType, Datum, Predicate, PredicateOperator};
use crate::table::{ArrowRecordBatchStream, RowRange};
use crate::Error;
@@ -38,9 +38,11 @@ use parquet::arrow::arrow_reader::{
ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter,
RowSelection, RowSelector,
};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
-use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
+use parquet::basic::{Compression, ZstdLevel};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use std::collections::HashMap;
use std::ops::Range;
@@ -48,6 +50,89 @@ use std::sync::Arc;
pub(crate) struct ParquetFormatReader;
+/// Parquet implementation of [`FormatFileWriter`].
+/// Streams data directly to storage via `AsyncArrowWriter` + opendal.
+pub(crate) struct ParquetFormatWriter {
+ inner: AsyncArrowWriter<Box<dyn crate::io::AsyncFileWrite>>,
+}
+
+impl ParquetFormatWriter {
+ pub(crate) async fn new(
+ output: &OutputFile,
+ schema: arrow_schema::SchemaRef,
+ compression: &str,
+ zstd_level: i32,
+ ) -> crate::Result<Self> {
+ let async_write = output.async_writer().await?;
+ let codec = parse_compression(compression, zstd_level);
+ let props = WriterProperties::builder().set_compression(codec).build();
+ let inner = AsyncArrowWriter::try_new(async_write, schema,
Some(props)).map_err(|e| {
+ crate::Error::DataInvalid {
+ message: format!("Failed to create parquet writer: {e}"),
+ source: None,
+ }
+ })?;
+ Ok(Self { inner })
+ }
+}
+
+/// Map Paimon `file.compression` value to parquet [`Compression`].
+fn parse_compression(codec: &str, zstd_level: i32) -> Compression {
+ match codec.to_ascii_lowercase().as_str() {
+ "zstd" => {
+ let level = ZstdLevel::try_new(zstd_level).unwrap_or_default();
+ Compression::ZSTD(level)
+ }
+ "lz4" => Compression::LZ4_RAW,
+ "snappy" => Compression::SNAPPY,
+ "gzip" | "gz" => Compression::GZIP(Default::default()),
+ "none" | "uncompressed" => Compression::UNCOMPRESSED,
+ _ => Compression::UNCOMPRESSED,
+ }
+}
+
+#[async_trait]
+impl FormatFileWriter for ParquetFormatWriter {
+ async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()> {
+ self.inner
+ .write(batch)
+ .await
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("Failed to write parquet batch: {e}"),
+ source: None,
+ })
+ }
+
+ fn num_bytes(&self) -> usize {
+ self.inner.bytes_written() + self.inner.in_progress_size()
+ }
+
+ fn in_progress_size(&self) -> usize {
+ self.inner.in_progress_size()
+ }
+
+ async fn flush(&mut self) -> crate::Result<()> {
+ self.inner
+ .flush()
+ .await
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("Failed to flush parquet writer: {e}"),
+ source: None,
+ })
+ }
+
+ async fn close(mut self: Box<Self>) -> crate::Result<u64> {
+ self.inner
+ .finish()
+ .await
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("Failed to close parquet writer: {e}"),
+ source: None,
+ })?;
+ Ok(self.inner.bytes_written() as u64)
+ }
+}
+
#[async_trait]
impl FormatFileReader for ParquetFormatReader {
async fn read_batch_stream(
@@ -1050,7 +1135,12 @@ fn split_ranges_for_concurrency(merged: Vec<Range<u64>>,
concurrency: usize) ->
#[cfg(test)]
mod tests {
use super::build_parquet_row_filter;
+ use super::ParquetFormatWriter;
+ use crate::arrow::format::FormatFileWriter;
+ use crate::io::FileIOBuilder;
use crate::spec::{DataField, DataType, Datum, IntType, PredicateBuilder};
+ use arrow_array::{Int32Array, RecordBatch};
+ use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema
as ArrowSchema};
use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor};
use std::sync::Arc;
@@ -1241,4 +1331,81 @@ mod tests {
let result = super::split_ranges_for_concurrency(merged, 4);
assert!(result.is_empty());
}
+
+ fn writer_arrow_schema() -> Arc<ArrowSchema> {
+ Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ]))
+ }
+
+ fn writer_test_batch(
+ schema: &Arc<ArrowSchema>,
+ ids: Vec<i32>,
+ values: Vec<i32>,
+ ) -> RecordBatch {
+ RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ ],
+ )
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_parquet_writer_write_and_close() {
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let path = "memory:/test_parquet_writer_write_close.parquet";
+ let output = file_io.new_output(path).unwrap();
+ let schema = writer_arrow_schema();
+
+ let mut writer: Box<dyn FormatFileWriter> = Box::new(
+ ParquetFormatWriter::new(&output, schema.clone(), "zstd", 1)
+ .await
+ .unwrap(),
+ );
+
+ let batch = writer_test_batch(&schema, vec![1, 2, 3], vec![10, 20,
30]);
+ writer.write(&batch).await.unwrap();
+ writer.close().await.unwrap();
+
+ // Verify valid parquet by reading back
+ let bytes = file_io.new_input(path).unwrap().read().await.unwrap();
+ let reader =
+
parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes,
1024).unwrap();
+ let total_rows: usize = reader.into_iter().map(|r|
r.unwrap().num_rows()).sum();
+ assert_eq!(total_rows, 3);
+ }
+
+ #[tokio::test]
+ async fn test_parquet_writer_multiple_batches() {
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let path = "memory:/test_parquet_writer_multi.parquet";
+ let output = file_io.new_output(path).unwrap();
+ let schema = writer_arrow_schema();
+
+ let mut writer: Box<dyn FormatFileWriter> = Box::new(
+ ParquetFormatWriter::new(&output, schema.clone(), "zstd", 1)
+ .await
+ .unwrap(),
+ );
+
+ writer
+ .write(&writer_test_batch(&schema, vec![1, 2], vec![10, 20]))
+ .await
+ .unwrap();
+ writer
+ .write(&writer_test_batch(&schema, vec![3, 4, 5], vec![30, 40,
50]))
+ .await
+ .unwrap();
+ writer.close().await.unwrap();
+
+ let bytes = file_io.new_input(path).unwrap().read().await.unwrap();
+ let reader =
+
parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes,
1024).unwrap();
+ let total_rows: usize = reader.into_iter().map(|r|
r.unwrap().num_rows()).sum();
+ assert_eq!(total_rows, 5);
+ }
}
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 6f41f11..93758e8 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -26,6 +26,7 @@ use chrono::{DateTime, Utc};
use opendal::raw::normalize_root;
use opendal::Operator;
use snafu::ResultExt;
+use tokio_util::compat::FuturesAsyncWriteCompatExt;
use url::Url;
use super::Storage;
@@ -309,6 +310,11 @@ impl FileWrite for opendal::Writer {
}
}
+/// Async streaming writer trait for format-level writers (e.g. parquet).
+pub trait AsyncFileWrite: tokio::io::AsyncWrite + Unpin + Send {}
+
+impl<T: tokio::io::AsyncWrite + Unpin + Send> AsyncFileWrite for T {}
+
#[derive(Clone, Debug)]
pub struct FileStatus {
pub size: u64,
@@ -390,10 +396,22 @@ impl OutputFile {
}
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
+ Ok(Box::new(self.opendal_writer().await?))
+ }
+
+ /// Get an async streaming writer for format-level writes (e.g. parquet).
+ pub(crate) async fn async_writer(&self) -> crate::Result<Box<dyn
AsyncFileWrite>> {
Ok(Box::new(
- self.op.writer(&self.path[self.relative_path_pos..]).await?,
+ self.opendal_writer()
+ .await?
+ .into_futures_async_write()
+ .compat_write(),
))
}
+
+ async fn opendal_writer(&self) -> crate::Result<opendal::Writer> {
+ Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index 3867d69..b322347 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -44,5 +44,6 @@ pub use catalog::FileSystemCatalog;
pub use table::{
CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket,
Plan, RESTEnv,
RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange,
SnapshotCommit,
- SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager,
WriteBuilder,
+ SnapshotManager, Table, TableCommit, TableRead, TableScan, TableWrite,
TagManager,
+ WriteBuilder,
};
diff --git a/crates/paimon/src/spec/binary_row.rs
b/crates/paimon/src/spec/binary_row.rs
index 0ba6b78..6599484 100644
--- a/crates/paimon/src/spec/binary_row.rs
+++ b/crates/paimon/src/spec/binary_row.rs
@@ -20,10 +20,15 @@
use crate::spec::murmur_hash::hash_by_words;
use crate::spec::{DataType, Datum};
+use arrow_array::RecordBatch;
use serde::{Deserialize, Serialize};
+use std::sync::LazyLock;
pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);
+pub static EMPTY_SERIALIZED_ROW: LazyLock<Vec<u8>> =
+ LazyLock::new(|| BinaryRowBuilder::new(0).build_serialized());
+
/// Highest bit mask for detecting inline vs variable-length encoding.
const HIGHEST_FIRST_BIT: u64 = 0x80 << 56;
@@ -276,82 +281,28 @@ impl BinaryRow {
}
/// Build a BinaryRow from typed Datum values using `BinaryRowBuilder`.
- pub fn from_datums(datums: &[(&crate::spec::Datum,
&crate::spec::DataType)]) -> Option<Self> {
+ /// `None` entries are written as null fields.
+ pub fn from_datums(datums: &[(Option<&crate::spec::Datum>,
&crate::spec::DataType)]) -> Self {
let arity = datums.len() as i32;
let mut builder = BinaryRowBuilder::new(arity);
- for (pos, (datum, data_type)) in datums.iter().enumerate() {
- match datum {
- crate::spec::Datum::Bool(v) => builder.write_boolean(pos, *v),
- crate::spec::Datum::TinyInt(v) => builder.write_byte(pos, *v),
- crate::spec::Datum::SmallInt(v) => builder.write_short(pos,
*v),
- crate::spec::Datum::Int(v)
- | crate::spec::Datum::Date(v)
- | crate::spec::Datum::Time(v) => builder.write_int(pos, *v),
- crate::spec::Datum::Long(v) => builder.write_long(pos, *v),
- crate::spec::Datum::Float(v) => builder.write_float(pos, *v),
- crate::spec::Datum::Double(v) => builder.write_double(pos, *v),
- crate::spec::Datum::Timestamp { millis, nanos } => {
- let precision = match data_type {
- crate::spec::DataType::Timestamp(ts) => ts.precision(),
- _ => 3,
- };
- if precision <= 3 {
- builder.write_timestamp_compact(pos, *millis);
- } else {
- builder.write_timestamp_non_compact(pos, *millis,
*nanos);
- }
- }
- crate::spec::Datum::LocalZonedTimestamp { millis, nanos } => {
- let precision = match data_type {
- crate::spec::DataType::LocalZonedTimestamp(ts) =>
ts.precision(),
- _ => 3,
- };
- if precision <= 3 {
- builder.write_timestamp_compact(pos, *millis);
- } else {
- builder.write_timestamp_non_compact(pos, *millis,
*nanos);
- }
- }
- crate::spec::Datum::Decimal {
- unscaled,
- precision,
- ..
- } => {
- if *precision <= 18 {
- builder.write_decimal_compact(pos, *unscaled as i64);
- } else {
- builder.write_decimal_var_len(pos, *unscaled);
- }
- }
- crate::spec::Datum::String(s) => {
- if s.len() <= 7 {
- builder.write_string_inline(pos, s);
- } else {
- builder.write_string(pos, s);
- }
- }
- crate::spec::Datum::Bytes(b) => {
- if b.len() <= 7 {
- builder.write_binary_inline(pos, b);
- } else {
- builder.write_binary(pos, b);
- }
- }
+ for (pos, (datum_opt, data_type)) in datums.iter().enumerate() {
+ match datum_opt {
+ Some(datum) => builder.write_datum(pos, datum, data_type),
+ None => builder.set_null_at(pos),
}
}
- let row = builder.build();
- Some(row)
+ builder.build()
}
pub fn compute_bucket_from_datums(
- datums: &[(&crate::spec::Datum, &crate::spec::DataType)],
+ datums: &[(Option<&crate::spec::Datum>, &crate::spec::DataType)],
total_buckets: i32,
- ) -> Option<i32> {
- let row = Self::from_datums(datums)?;
+ ) -> i32 {
+ let row = Self::from_datums(datums);
let hash = row.hash_code();
- Some((hash % total_buckets).abs())
+ (hash % total_buckets).abs()
}
}
@@ -607,6 +558,169 @@ pub fn datums_to_binary_row(datums: &[(&Option<Datum>,
&DataType)]) -> Vec<u8> {
builder.build_serialized()
}
+/// Extract a Datum from an Arrow RecordBatch column at the given row index.
+pub fn extract_datum_from_arrow(
+ batch: &RecordBatch,
+ row_idx: usize,
+ col_idx: usize,
+ data_type: &DataType,
+) -> crate::Result<Option<Datum>> {
+ use arrow_array::Array;
+
+ let col = batch.column(col_idx);
+ if col.is_null(row_idx) {
+ return Ok(None);
+ }
+
+ let datum = match data_type {
+ DataType::Boolean(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::BooleanArray>()
+ .ok_or_else(|| type_mismatch_err("Boolean", col_idx))?;
+ Datum::Bool(arr.value(row_idx))
+ }
+ DataType::TinyInt(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Int8Array>()
+ .ok_or_else(|| type_mismatch_err("TinyInt", col_idx))?;
+ Datum::TinyInt(arr.value(row_idx))
+ }
+ DataType::SmallInt(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Int16Array>()
+ .ok_or_else(|| type_mismatch_err("SmallInt", col_idx))?;
+ Datum::SmallInt(arr.value(row_idx))
+ }
+ DataType::Int(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Int32Array>()
+ .ok_or_else(|| type_mismatch_err("Int", col_idx))?;
+ Datum::Int(arr.value(row_idx))
+ }
+ DataType::BigInt(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Int64Array>()
+ .ok_or_else(|| type_mismatch_err("BigInt", col_idx))?;
+ Datum::Long(arr.value(row_idx))
+ }
+ DataType::Float(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Float32Array>()
+ .ok_or_else(|| type_mismatch_err("Float", col_idx))?;
+ Datum::Float(arr.value(row_idx))
+ }
+ DataType::Double(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Float64Array>()
+ .ok_or_else(|| type_mismatch_err("Double", col_idx))?;
+ Datum::Double(arr.value(row_idx))
+ }
+ DataType::Char(_) | DataType::VarChar(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::StringArray>()
+ .ok_or_else(|| type_mismatch_err("String", col_idx))?;
+ Datum::String(arr.value(row_idx).to_string())
+ }
+ DataType::Date(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Date32Array>()
+ .ok_or_else(|| type_mismatch_err("Date", col_idx))?;
+ Datum::Date(arr.value(row_idx))
+ }
+ DataType::Decimal(d) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::Decimal128Array>()
+ .ok_or_else(|| type_mismatch_err("Decimal", col_idx))?;
+ Datum::Decimal {
+ unscaled: arr.value(row_idx),
+ precision: d.precision(),
+ scale: d.scale(),
+ }
+ }
+ DataType::Binary(_) | DataType::VarBinary(_) => {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::BinaryArray>()
+ .ok_or_else(|| type_mismatch_err("Binary", col_idx))?;
+ Datum::Bytes(arr.value(row_idx).to_vec())
+ }
+ DataType::Timestamp(ts) => {
+ if ts.precision() <= 3 {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::TimestampMillisecondArray>()
+ .ok_or_else(|| type_mismatch_err("Timestamp(ms)",
col_idx))?;
+ Datum::Timestamp {
+ millis: arr.value(row_idx),
+ nanos: 0,
+ }
+ } else {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
+ .ok_or_else(|| type_mismatch_err("Timestamp(us)",
col_idx))?;
+ let micros = arr.value(row_idx);
+ Datum::Timestamp {
+ millis: micros / 1000,
+ nanos: ((micros % 1000) * 1000) as i32,
+ }
+ }
+ }
+ DataType::LocalZonedTimestamp(ts) => {
+ if ts.precision() <= 3 {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::TimestampMillisecondArray>()
+ .ok_or_else(||
type_mismatch_err("LocalZonedTimestamp(ms)", col_idx))?;
+ Datum::LocalZonedTimestamp {
+ millis: arr.value(row_idx),
+ nanos: 0,
+ }
+ } else {
+ let arr = col
+ .as_any()
+ .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
+ .ok_or_else(||
type_mismatch_err("LocalZonedTimestamp(us)", col_idx))?;
+ let micros = arr.value(row_idx);
+ Datum::LocalZonedTimestamp {
+ millis: micros / 1000,
+ nanos: ((micros % 1000) * 1000) as i32,
+ }
+ }
+ }
+ _ => {
+ return Err(crate::Error::Unsupported {
+ message: format!(
+ "Unsupported data type {:?} for Arrow extraction at column
{}",
+ data_type, col_idx
+ ),
+ });
+ }
+ };
+
+ Ok(Some(datum))
+}
+
+fn type_mismatch_err(expected: &str, col_idx: usize) -> crate::Error {
+ crate::Error::DataInvalid {
+ message: format!(
+ "Arrow column {} type mismatch: expected {} compatible array",
+ col_idx, expected
+ ),
+ source: None,
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index 17993d9..bb52022 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -27,12 +27,15 @@ const PARTITION_LEGACY_NAME_OPTION: &str =
"partition.legacy-name";
const BUCKET_KEY_OPTION: &str = "bucket-key";
const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
const BUCKET_OPTION: &str = "bucket";
-const DEFAULT_BUCKET: i32 = 1;
+const DEFAULT_BUCKET: i32 = -1;
const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait";
+const FILE_COMPRESSION_OPTION: &str = "file.compression";
+const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level";
const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
+const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10;
const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000;
const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000;
@@ -43,6 +46,8 @@ pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
+const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
+const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
/// Typed accessors for common table options.
///
@@ -264,6 +269,41 @@ impl<'a> CoreOptions<'a> {
.map(|v| v.eq_ignore_ascii_case("default"))
.unwrap_or(true)
}
+
+ /// Target file size for data files. Default is 128MB.
+ pub fn target_file_size(&self) -> i64 {
+ self.options
+ .get("target-file-size")
+ .and_then(|v| parse_memory_size(v))
+ .unwrap_or(DEFAULT_TARGET_FILE_SIZE)
+ }
+
+ /// File compression codec (e.g. "lz4", "zstd", "snappy", "none").
+ /// Default is "zstd".
+ pub fn file_compression(&self) -> &str {
+ self.options
+ .get(FILE_COMPRESSION_OPTION)
+ .map(String::as_str)
+ .unwrap_or("zstd")
+ }
+
+ /// Zstd compression level. Only meaningful when `file.compression` is
`"zstd"`.
+ /// Default is 1 (matching Paimon Java).
+ pub fn file_compression_zstd_level(&self) -> i32 {
+ self.options
+ .get(FILE_COMPRESSION_ZSTD_LEVEL_OPTION)
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(1)
+ }
+
+ /// Parquet writer in-progress buffer size limit. Default is 256MB.
+ /// When the buffered data exceeds this, the writer flushes the current
row group.
+ pub fn write_parquet_buffer_size(&self) -> i64 {
+ self.options
+ .get(WRITE_PARQUET_BUFFER_SIZE_OPTION)
+ .and_then(|v| parse_memory_size(v))
+ .unwrap_or(DEFAULT_WRITE_PARQUET_BUFFER_SIZE)
+ }
}
/// Parse a memory size string to bytes using binary (1024-based) semantics.
@@ -421,7 +461,7 @@ mod tests {
fn test_commit_options_defaults() {
let options = HashMap::new();
let core = CoreOptions::new(&options);
- assert_eq!(core.bucket(), 1);
+ assert_eq!(core.bucket(), -1);
assert_eq!(core.commit_max_retries(), 10);
assert_eq!(core.commit_timeout_ms(), 120_000);
assert_eq!(core.commit_min_retry_wait_ms(), 1_000);
@@ -477,4 +517,21 @@ mod tests {
Some(TimeTravelSelector::TimestampMillis(1234))
);
}
+
+ #[test]
+ fn test_write_options_defaults() {
+ let options = HashMap::new();
+ let core = CoreOptions::new(&options);
+ assert_eq!(core.write_parquet_buffer_size(), 256 * 1024 * 1024);
+ }
+
+ #[test]
+ fn test_write_options_custom() {
+ let options = HashMap::from([(
+ WRITE_PARQUET_BUFFER_SIZE_OPTION.to_string(),
+ "32mb".to_string(),
+ )]);
+ let core = CoreOptions::new(&options);
+ assert_eq!(core.write_parquet_buffer_size(), 32 * 1024 * 1024);
+ }
}
diff --git a/crates/paimon/src/spec/partition_utils.rs
b/crates/paimon/src/spec/partition_utils.rs
index fa41cfc..5d05863 100644
--- a/crates/paimon/src/spec/partition_utils.rs
+++ b/crates/paimon/src/spec/partition_utils.rs
@@ -38,6 +38,7 @@ const MILLIS_PER_DAY: i64 = 86_400_000;
/// (escaped directory path).
///
/// Reference: `org.apache.paimon.utils.InternalRowPartitionComputer` in Java
Paimon.
+#[derive(Debug)]
pub(crate) struct PartitionComputer {
partition_keys: Vec<String>,
partition_fields: Vec<DataField>,
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 62ab3fe..b0baf66 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::spec::core_options::CoreOptions;
use crate::spec::types::{DataType, RowType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -115,6 +116,25 @@ impl TableSchema {
pub fn time_millis(&self) -> i64 {
self.time_millis
}
+
+ /// Compute the effective bucket key columns.
+ ///
+ /// Priority: explicit `bucket-key` option > primary keys > all
non-partition fields.
+ pub fn bucket_keys(&self) -> Vec<String> {
+ let core_options = CoreOptions::new(&self.options);
+ if let Some(keys) = core_options.bucket_key() {
+ return keys;
+ }
+ if !self.primary_keys.is_empty() {
+ return self.primary_keys.clone();
+ }
+ let partition_set: HashSet<&str> =
self.partition_keys.iter().map(String::as_str).collect();
+ self.fields
+ .iter()
+ .filter(|f| !partition_set.contains(f.name()))
+ .map(|f| f.name().to_string())
+ .collect()
+ }
}
pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID";
diff --git a/crates/paimon/src/table/bucket_filter.rs
b/crates/paimon/src/table/bucket_filter.rs
index ea300df..80942d3 100644
--- a/crates/paimon/src/table/bucket_filter.rs
+++ b/crates/paimon/src/table/bucket_filter.rs
@@ -101,14 +101,14 @@ pub(super) fn compute_target_buckets(
}
// Collect equal-value candidates per bucket key field (by projected
index).
- // Each field can have one value (Eq) or multiple values (In).
+ // Each field can have one value (Eq), multiple values (In), or NULL
(IsNull).
let num_keys = bucket_key_fields.len();
- let mut field_candidates: Vec<Option<Vec<&Datum>>> = vec![None; num_keys];
+ let mut field_candidates: Vec<Option<Vec<Option<&Datum>>>> = vec![None;
num_keys];
collect_eq_candidates(bucket_predicate, &mut field_candidates);
// All bucket key fields must have candidates.
- let candidates: Vec<&Vec<&Datum>> =
+ let candidates: Vec<&Vec<Option<&Datum>>> =
field_candidates.iter().filter_map(|c| c.as_ref()).collect();
if candidates.len() != num_keys {
return None;
@@ -118,18 +118,15 @@ pub(super) fn compute_target_buckets(
let mut buckets = HashSet::new();
let mut combo: Vec<usize> = vec![0; num_keys];
loop {
- let datums: Vec<(&Datum, &DataType)> = (0..num_keys)
+ let datums: Vec<(Option<&Datum>, &DataType)> = (0..num_keys)
.map(|i| {
let vals = field_candidates[i].as_ref().unwrap();
(vals[combo[i]], bucket_key_fields[i].data_type())
})
.collect();
- if let Some(bucket) = BinaryRow::compute_bucket_from_datums(&datums,
total_buckets) {
- buckets.insert(bucket);
- } else {
- return None;
- }
+ let bucket = BinaryRow::compute_bucket_from_datums(&datums,
total_buckets);
+ buckets.insert(bucket);
// Advance the combination counter (rightmost first).
let mut carry = true;
@@ -155,10 +152,10 @@ pub(super) fn compute_target_buckets(
}
}
-/// Recursively collect Eq/In literal candidates from a predicate for each
bucket key field.
+/// Recursively collect Eq/In/IsNull literal candidates from a predicate for
each bucket key field.
fn collect_eq_candidates<'a>(
predicate: &'a Predicate,
- field_candidates: &mut Vec<Option<Vec<&'a Datum>>>,
+ field_candidates: &mut Vec<Option<Vec<Option<&'a Datum>>>>,
) {
match predicate {
Predicate::And(children) => {
@@ -176,14 +173,17 @@ fn collect_eq_candidates<'a>(
match op {
PredicateOperator::Eq => {
if let Some(lit) = literals.first() {
- field_candidates[*index] = Some(vec![lit]);
+ field_candidates[*index] = Some(vec![Some(lit)]);
}
}
PredicateOperator::In => {
if !literals.is_empty() {
- field_candidates[*index] =
Some(literals.iter().collect());
+ field_candidates[*index] =
Some(literals.iter().map(Some).collect());
}
}
+ PredicateOperator::IsNull => {
+ field_candidates[*index] = Some(vec![None]);
+ }
_ => {}
}
}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index c17ebbc..2fbd0a8 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -33,6 +33,7 @@ mod source;
mod stats_filter;
pub(crate) mod table_commit;
mod table_scan;
+pub(crate) mod table_write;
mod tag_manager;
mod write_builder;
@@ -52,6 +53,7 @@ pub use source::{
};
pub use table_commit::TableCommit;
pub use table_scan::TableScan;
+pub use table_write::TableWrite;
pub use tag_manager::TagManager;
pub use write_builder::WriteBuilder;
diff --git a/crates/paimon/src/table/table_commit.rs
b/crates/paimon/src/table/table_commit.rs
index bc7f5c1..e14a4b7 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -48,21 +48,17 @@ pub struct TableCommit {
snapshot_commit: Arc<dyn SnapshotCommit>,
commit_user: String,
total_buckets: i32,
- overwrite_partition: Option<HashMap<String, Datum>>,
// commit config
commit_max_retries: u32,
commit_timeout_ms: u64,
commit_min_retry_wait_ms: u64,
commit_max_retry_wait_ms: u64,
row_tracking_enabled: bool,
+ partition_default_name: String,
}
impl TableCommit {
- pub fn new(
- table: Table,
- commit_user: String,
- overwrite_partition: Option<HashMap<String, Datum>>,
- ) -> Self {
+ pub fn new(table: Table, commit_user: String) -> Self {
let snapshot_manager = SnapshotManager::new(table.file_io.clone(),
table.location.clone());
let snapshot_commit = if let Some(env) = &table.rest_env {
env.snapshot_commit()
@@ -78,65 +74,124 @@ impl TableCommit {
let commit_min_retry_wait_ms = core_options.commit_min_retry_wait_ms();
let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms();
let row_tracking_enabled = core_options.row_tracking_enabled();
+ let partition_default_name =
core_options.partition_default_name().to_string();
Self {
table,
snapshot_manager,
snapshot_commit,
commit_user,
total_buckets,
- overwrite_partition,
commit_max_retries,
commit_timeout_ms,
commit_min_retry_wait_ms,
commit_max_retry_wait_ms,
row_tracking_enabled,
+ partition_default_name,
}
}
- /// Commit new files. Uses OVERWRITE mode if overwrite_partition was set
- /// in the constructor, otherwise uses APPEND mode.
+ /// Commit new files in APPEND mode.
pub async fn commit(&self, commit_messages: Vec<CommitMessage>) ->
Result<()> {
if commit_messages.is_empty() {
return Ok(());
}
let commit_entries = self.messages_to_entries(&commit_messages);
+ self.try_commit(
+ CommitKind::APPEND,
+ CommitEntriesPlan::Static(commit_entries),
+ )
+ .await
+ }
- if let Some(overwrite_partition) = &self.overwrite_partition {
- let partition_predicate = if overwrite_partition.is_empty() {
- None
- } else {
- Some(self.build_partition_predicate(overwrite_partition)?)
- };
- self.try_commit(
- CommitKind::OVERWRITE,
- CommitEntriesPlan::Overwrite {
- partition_predicate,
- new_entries: commit_entries,
- },
- )
- .await
- } else {
- self.try_commit(
- CommitKind::APPEND,
- CommitEntriesPlan::Static(commit_entries),
- )
- .await
+ /// Overwrite with dynamic partition detection.
+ ///
+ /// Extracts the set of partitions touched by `commit_messages` and
overwrites
+ /// only those partitions. For unpartitioned tables this is a full table
overwrite.
+ pub async fn overwrite(&self, commit_messages: Vec<CommitMessage>) ->
Result<()> {
+ if commit_messages.is_empty() {
+ return Ok(());
+ }
+
+ let commit_entries = self.messages_to_entries(&commit_messages);
+ let partition_predicate =
self.build_dynamic_partition_predicate(&commit_messages)?;
+ self.try_commit(
+ CommitKind::OVERWRITE,
+ CommitEntriesPlan::Overwrite {
+ partition_predicate,
+ new_entries: commit_entries,
+ },
+ )
+ .await
+ }
+
+ /// Build a dynamic partition predicate from the partitions present in
commit messages.
+ ///
+ /// Returns `None` for unpartitioned tables (full table overwrite).
+ fn build_dynamic_partition_predicate(
+ &self,
+ commit_messages: &[CommitMessage],
+ ) -> Result<Option<Predicate>> {
+ let partition_fields = self.table.schema().partition_fields();
+ if partition_fields.is_empty() {
+ return Ok(None);
}
+
+ let data_types: Vec<_> = partition_fields
+ .iter()
+ .map(|f| f.data_type().clone())
+ .collect();
+ let partition_keys: Vec<_> = self
+ .table
+ .schema()
+ .partition_keys()
+ .iter()
+ .map(|s| s.to_string())
+ .collect();
+
+ // Collect unique partition bytes
+ let mut seen = std::collections::HashSet::new();
+ let mut partition_specs: Vec<HashMap<String, Option<Datum>>> =
Vec::new();
+ for msg in commit_messages {
+ if seen.insert(msg.partition.clone()) {
+ let row = BinaryRow::from_serialized_bytes(&msg.partition)?;
+ let mut spec = HashMap::new();
+ for (i, key) in partition_keys.iter().enumerate() {
+ spec.insert(key.clone(), extract_datum(&row, i,
&data_types[i])?);
+ }
+ partition_specs.push(spec);
+ }
+ }
+
+ let predicates: Vec<Predicate> = partition_specs
+ .iter()
+ .map(|p| self.build_partition_predicate(p))
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Some(Predicate::or(predicates)))
}
- /// Build a partition predicate from key-value pairs.
- fn build_partition_predicate(&self, partition: &HashMap<String, Datum>) ->
Result<Predicate> {
+ /// Build a partition predicate from key-value pairs, handling NULL via IS
NULL.
+ fn build_partition_predicate(
+ &self,
+ partition: &HashMap<String, Option<Datum>>,
+ ) -> Result<Predicate> {
let pb =
PredicateBuilder::new(&self.table.schema().partition_fields());
let predicates: Vec<Predicate> = partition
.iter()
- .map(|(key, value)| pb.equal(key, value.clone()))
+ .map(|(key, value)| match value {
+ Some(v) => pb.equal(key, v.clone()),
+ None => pb.is_null(key),
+ })
.collect::<Result<Vec<_>>>()?;
Ok(Predicate::and(predicates))
}
/// Drop specific partitions (OVERWRITE with only deletes).
- pub async fn truncate_partitions(&self, partitions: Vec<HashMap<String,
Datum>>) -> Result<()> {
+ pub async fn truncate_partitions(
+ &self,
+ partitions: Vec<HashMap<String, Option<Datum>>>,
+ ) -> Result<()> {
if partitions.is_empty() {
return Ok(());
}
@@ -603,9 +658,11 @@ impl TableCommit {
}
let row = BinaryRow::from_serialized_bytes(partition_bytes)?;
for (i, key) in partition_keys.iter().enumerate() {
- if let Some(datum) = extract_datum(&row, i, &data_types[i])? {
- spec.insert(key.clone(), datum.to_string());
- }
+ let value = match extract_datum(&row, i, &data_types[i])? {
+ Some(datum) => datum.to_string(),
+ None => self.partition_default_name.clone(),
+ };
+ spec.insert(key.clone(), value);
}
Ok(spec)
}
@@ -747,20 +804,15 @@ mod tests {
fn setup_commit(file_io: &FileIO, table_path: &str) -> TableCommit {
let table = test_table(file_io, table_path);
- TableCommit::new(table, "test-user".to_string(), None)
+ TableCommit::new(table, "test-user".to_string())
}
fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) ->
TableCommit {
let table = test_partitioned_table(file_io, table_path);
- TableCommit::new(table, "test-user".to_string(), None)
+ TableCommit::new(table, "test-user".to_string())
}
fn partition_bytes(pt: &str) -> Vec<u8> {
- use crate::spec::{DataType, VarCharType};
- let datum = Datum::String(pt.to_string());
- let dt = DataType::VarChar(VarCharType::string_type());
- let datums = vec![(&datum, &dt)];
- BinaryRow::from_datums(&datums).unwrap();
let mut builder = BinaryRowBuilder::new(1);
if pt.len() <= 7 {
builder.write_string_inline(0, pt);
@@ -923,16 +975,9 @@ mod tests {
.await
.unwrap();
- // Overwrite partition "a" with new data
- let mut overwrite_partition = HashMap::new();
- overwrite_partition.insert("pt".to_string(),
Datum::String("a".to_string()));
-
- let table = test_partitioned_table(&file_io, table_path);
- let overwrite_commit =
- TableCommit::new(table, "test-user".to_string(),
Some(overwrite_partition));
-
- overwrite_commit
- .commit(vec![CommitMessage::new(
+ // Overwrite partition "a" with new data (dynamic partition overwrite)
+ commit
+ .overwrite(vec![CommitMessage::new(
partition_bytes("a"),
0,
vec![test_data_file("data-a2.parquet", 50)],
@@ -980,8 +1025,8 @@ mod tests {
// Drop partitions "a" and "c"
let partitions = vec![
- HashMap::from([("pt".to_string(),
Datum::String("a".to_string()))]),
- HashMap::from([("pt".to_string(),
Datum::String("c".to_string()))]),
+ HashMap::from([("pt".to_string(),
Some(Datum::String("a".to_string())))]),
+ HashMap::from([("pt".to_string(),
Some(Datum::String("c".to_string())))]),
];
commit.truncate_partitions(partitions).await.unwrap();
@@ -992,4 +1037,58 @@ mod tests {
// 600 - 100 (a) - 300 (c) = 200
assert_eq!(snapshot.total_record_count(), Some(200));
}
+
+ fn null_partition_bytes() -> Vec<u8> {
+ let mut builder = BinaryRowBuilder::new(1);
+ builder.set_null_at(0);
+ builder.build_serialized()
+ }
+
+ #[tokio::test]
+ async fn test_overwrite_null_partition() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_overwrite_null_partition";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_partitioned_commit(&file_io, table_path);
+
+ // Append data for partition "a", "b", and NULL
+ commit
+ .commit(vec![
+ CommitMessage::new(
+ partition_bytes("a"),
+ 0,
+ vec![test_data_file("data-a.parquet", 100)],
+ ),
+ CommitMessage::new(
+ partition_bytes("b"),
+ 0,
+ vec![test_data_file("data-b.parquet", 200)],
+ ),
+ CommitMessage::new(
+ null_partition_bytes(),
+ 0,
+ vec![test_data_file("data-null.parquet", 300)],
+ ),
+ ])
+ .await
+ .unwrap();
+
+ // Overwrite NULL partition only — should NOT affect "a" or "b"
+ commit
+ .overwrite(vec![CommitMessage::new(
+ null_partition_bytes(),
+ 0,
+ vec![test_data_file("data-null2.parquet", 50)],
+ )])
+ .await
+ .unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 2);
+ assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+ // 600 - 300 (delete null) + 50 (add null2) = 350
+ assert_eq!(snapshot.total_record_count(), Some(350));
+ }
}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 06fe87a..1d2f621 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -1375,4 +1375,64 @@ mod tests {
let bucket = *buckets.iter().next().unwrap();
assert!((0..4).contains(&bucket));
}
+
+ #[test]
+ fn test_compute_target_buckets_is_null() {
+ let fields = bucket_key_fields();
+ let pred = Predicate::Leaf {
+ column: "id".into(),
+ index: 0,
+ data_type: DataType::Int(IntType::new()),
+ op: PredicateOperator::IsNull,
+ literals: vec![],
+ };
+
+ let buckets = compute_target_buckets(&pred, &fields, 4);
+ assert!(buckets.is_some(), "IsNull should determine a target bucket");
+ let buckets = buckets.unwrap();
+ assert_eq!(buckets.len(), 1);
+ let bucket = *buckets.iter().next().unwrap();
+ assert!((0..4).contains(&bucket));
+
+ // Verify it matches the expected bucket from a null BinaryRow
+ let mut builder = BinaryRowBuilder::new(1);
+ builder.set_null_at(0);
+ let expected = (builder.build().hash_code() % 4).abs();
+ assert_eq!(bucket, expected);
+ }
+
+ #[test]
+ fn test_compute_target_buckets_composite_key_with_null() {
+ let fields = vec![
+ DataField::new(0, "a".to_string(), DataType::Int(IntType::new())),
+ DataField::new(1, "b".to_string(), DataType::Int(IntType::new())),
+ ];
+ // a = 1 AND b IS NULL
+ let pred = Predicate::And(vec![
+ Predicate::Leaf {
+ column: "a".into(),
+ index: 0,
+ data_type: DataType::Int(IntType::new()),
+ op: PredicateOperator::Eq,
+ literals: vec![Datum::Int(1)],
+ },
+ Predicate::Leaf {
+ column: "b".into(),
+ index: 1,
+ data_type: DataType::Int(IntType::new()),
+ op: PredicateOperator::IsNull,
+ literals: vec![],
+ },
+ ]);
+
+ let buckets = compute_target_buckets(&pred, &fields, 8);
+ assert!(
+ buckets.is_some(),
+ "Composite key with IsNull should determine a target bucket"
+ );
+ let buckets = buckets.unwrap();
+ assert_eq!(buckets.len(), 1);
+ let bucket = *buckets.iter().next().unwrap();
+ assert!((0..8).contains(&bucket));
+ }
}
diff --git a/crates/paimon/src/table/table_write.rs
b/crates/paimon/src/table/table_write.rs
new file mode 100644
index 0000000..7d5252e
--- /dev/null
+++ b/crates/paimon/src/table/table_write.rs
@@ -0,0 +1,998 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TableWrite for writing Arrow data to Paimon tables.
+//!
+//! Reference: [pypaimon
TableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py)
+//! and [pypaimon
FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py)
+
+use crate::arrow::format::{create_format_writer, FormatFileWriter};
+use crate::io::FileIO;
+use crate::spec::stats::BinaryTableStats;
+use crate::spec::PartitionComputer;
+use crate::spec::{
+ extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions,
DataField, DataFileMeta,
+ DataType, Datum, EMPTY_SERIALIZED_ROW,
+};
+use crate::table::commit_message::CommitMessage;
+use crate::table::Table;
+use crate::Result;
+use arrow_array::RecordBatch;
+use chrono::Utc;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+type PartitionBucketKey = (Vec<u8>, i32);
+
+/// TableWrite writes Arrow RecordBatches to Paimon data files.
+///
+/// Each (partition, bucket) pair gets its own `DataFileWriter` held in a
HashMap.
+/// Batches are routed to the correct writer based on partition/bucket.
+///
+/// Call `prepare_commit()` to close all writers and collect
+/// `CommitMessage`s for use with `TableCommit`.
+///
+/// Reference: [pypaimon
BatchTableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py)
+pub struct TableWrite {
+ table: Table,
+ partition_writers: HashMap<PartitionBucketKey, DataFileWriter>,
+ partition_computer: PartitionComputer,
+ partition_keys: Vec<String>,
+ partition_field_indices: Vec<usize>,
+ bucket_key_indices: Vec<usize>,
+ total_buckets: i32,
+ schema_id: i64,
+ target_file_size: i64,
+ file_compression: String,
+ file_compression_zstd_level: i32,
+ write_buffer_size: i64,
+}
+
+impl TableWrite {
+ pub(crate) fn new(table: &Table) -> crate::Result<Self> {
+ let schema = table.schema();
+ let core_options = CoreOptions::new(schema.options());
+
+ if !schema.primary_keys().is_empty() {
+ return Err(crate::Error::Unsupported {
+ message: "TableWrite does not support tables with primary
keys".to_string(),
+ });
+ }
+ if core_options.data_evolution_enabled() {
+ return Err(crate::Error::Unsupported {
+ message: "TableWrite does not support data-evolution.enabled
mode".to_string(),
+ });
+ }
+
+ let total_buckets = core_options.bucket();
+ if total_buckets != -1 && core_options.bucket_key().is_none() {
+ return Err(crate::Error::Unsupported {
+ message: "Append tables with fixed bucket must configure
'bucket-key'".to_string(),
+ });
+ }
+ let target_file_size = core_options.target_file_size();
+ let file_compression = core_options.file_compression().to_string();
+ let file_compression_zstd_level =
core_options.file_compression_zstd_level();
+ let write_buffer_size = core_options.write_parquet_buffer_size();
+ let partition_keys: Vec<String> = schema.partition_keys().to_vec();
+ let fields = schema.fields();
+
+ let partition_field_indices: Vec<usize> = partition_keys
+ .iter()
+ .filter_map(|pk| fields.iter().position(|f| f.name() == pk))
+ .collect();
+
+ // Bucket keys: resolved by TableSchema
+ let bucket_keys = schema.bucket_keys();
+
+ let bucket_key_indices: Vec<usize> = bucket_keys
+ .iter()
+ .filter_map(|bk| fields.iter().position(|f| f.name() == bk))
+ .collect();
+
+ let partition_computer = PartitionComputer::new(
+ &partition_keys,
+ fields,
+ core_options.partition_default_name(),
+ core_options.legacy_partition_name(),
+ )
+ .unwrap();
+
+ Ok(Self {
+ table: table.clone(),
+ partition_writers: HashMap::new(),
+ partition_computer,
+ partition_keys,
+ partition_field_indices,
+ bucket_key_indices,
+ total_buckets,
+ schema_id: schema.id(),
+ target_file_size,
+ file_compression,
+ file_compression_zstd_level,
+ write_buffer_size,
+ })
+ }
+
+ /// Write an Arrow RecordBatch. Rows are routed to the correct partition
and bucket.
+ pub async fn write_arrow_batch(&mut self, batch: &RecordBatch) ->
Result<()> {
+ if batch.num_rows() == 0 {
+ return Ok(());
+ }
+
+ let grouped = self.divide_by_partition_bucket(batch)?;
+ for ((partition_bytes, bucket), sub_batch) in grouped {
+ self.write_bucket(partition_bytes, bucket, sub_batch)
+ .await?;
+ }
+ Ok(())
+ }
+
+ /// Group rows by (partition_bytes, bucket) and return sub-batches.
+ fn divide_by_partition_bucket(
+ &self,
+ batch: &RecordBatch,
+ ) -> Result<Vec<(PartitionBucketKey, RecordBatch)>> {
+ // Fast path: no partitions and single bucket — skip per-row routing
+ if self.partition_field_indices.is_empty() && self.total_buckets <= 1 {
+ return Ok(vec![((EMPTY_SERIALIZED_ROW.clone(), 0),
batch.clone())]);
+ }
+
+ let fields = self.table.schema().fields();
+ let mut groups: HashMap<PartitionBucketKey, Vec<usize>> =
HashMap::new();
+
+ for row_idx in 0..batch.num_rows() {
+ let (partition_bytes, bucket) =
+ self.extract_partition_bucket(batch, row_idx, fields)?;
+ groups
+ .entry((partition_bytes, bucket))
+ .or_default()
+ .push(row_idx);
+ }
+
+ let mut result = Vec::with_capacity(groups.len());
+ for (key, row_indices) in groups {
+ let sub_batch = if row_indices.len() == batch.num_rows() {
+ batch.clone()
+ } else {
+ let indices = arrow_array::UInt32Array::from(
+ row_indices.iter().map(|&i| i as u32).collect::<Vec<_>>(),
+ );
+ let columns: Vec<Arc<dyn arrow_array::Array>> = batch
+ .columns()
+ .iter()
+ .map(|col| arrow_select::take::take(col.as_ref(),
&indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("Failed to take rows: {e}"),
+ source: None,
+ })?;
+ RecordBatch::try_new(batch.schema(), columns).map_err(|e| {
+ crate::Error::DataInvalid {
+ message: format!("Failed to create sub-batch: {e}"),
+ source: None,
+ }
+ })?
+ };
+ result.push((key, sub_batch));
+ }
+ Ok(result)
+ }
+
+ /// Write a batch directly to the DataFileWriter for the given (partition,
bucket).
+ async fn write_bucket(
+ &mut self,
+ partition_bytes: Vec<u8>,
+ bucket: i32,
+ batch: RecordBatch,
+ ) -> Result<()> {
+ let key = (partition_bytes, bucket);
+ if !self.partition_writers.contains_key(&key) {
+ self.create_writer(key.0.clone(), key.1)?;
+ }
+ let writer = self.partition_writers.get_mut(&key).unwrap();
+ writer.write(&batch).await
+ }
+
+ /// Write multiple Arrow RecordBatches.
+ pub async fn write_arrow(&mut self, batches: &[RecordBatch]) -> Result<()>
{
+ for batch in batches {
+ self.write_arrow_batch(batch).await?;
+ }
+ Ok(())
+ }
+
+ /// Close all writers and collect CommitMessages for use with TableCommit.
+ /// Writers are cleared after this call, allowing the TableWrite to be
reused.
+ pub async fn prepare_commit(&mut self) -> Result<Vec<CommitMessage>> {
+ let writers: Vec<(PartitionBucketKey, DataFileWriter)> =
+ self.partition_writers.drain().collect();
+
+ let futures: Vec<_> = writers
+ .into_iter()
+ .map(|((partition_bytes, bucket), mut writer)| async move {
+ let files = writer.prepare_commit().await?;
+ Ok::<_, crate::Error>((partition_bytes, bucket, files))
+ })
+ .collect();
+
+ let results = futures::future::try_join_all(futures).await?;
+
+ let mut messages = Vec::new();
+ for (partition_bytes, bucket, files) in results {
+ if !files.is_empty() {
+ messages.push(CommitMessage::new(partition_bytes, bucket,
files));
+ }
+ }
+ Ok(messages)
+ }
+
+ /// Extract partition bytes and bucket for a single row.
+ fn extract_partition_bucket(
+ &self,
+ batch: &RecordBatch,
+ row_idx: usize,
+ fields: &[DataField],
+ ) -> Result<PartitionBucketKey> {
+ // Build partition BinaryRow
+ let partition_bytes = if self.partition_field_indices.is_empty() {
+ EMPTY_SERIALIZED_ROW.clone()
+ } else {
+ let mut builder =
BinaryRowBuilder::new(self.partition_field_indices.len() as i32);
+ for (pos, &field_idx) in
self.partition_field_indices.iter().enumerate() {
+ let field = &fields[field_idx];
+ match extract_datum_from_arrow(batch, row_idx, field_idx,
field.data_type())? {
+ Some(datum) => builder.write_datum(pos, &datum,
field.data_type()),
+ None => builder.set_null_at(pos),
+ }
+ }
+ builder.build_serialized()
+ };
+
+ // Compute bucket
+ let bucket = if self.total_buckets <= 1 ||
self.bucket_key_indices.is_empty() {
+ 0
+ } else {
+ let mut datums: Vec<(Option<Datum>, DataType)> = Vec::new();
+ for &field_idx in &self.bucket_key_indices {
+ let field = &fields[field_idx];
+ let datum = extract_datum_from_arrow(batch, row_idx,
field_idx, field.data_type())?;
+ datums.push((datum, field.data_type().clone()));
+ }
+ let refs: Vec<(Option<&Datum>, &DataType)> =
+ datums.iter().map(|(d, t)| (d.as_ref(), t)).collect();
+ BinaryRow::compute_bucket_from_datums(&refs, self.total_buckets)
+ };
+
+ Ok((partition_bytes, bucket))
+ }
+
+ fn create_writer(&mut self, partition_bytes: Vec<u8>, bucket: i32) ->
Result<()> {
+ let partition_path = if self.partition_keys.is_empty() {
+ String::new()
+ } else {
+ let row = BinaryRow::from_serialized_bytes(&partition_bytes)?;
+ self.partition_computer.generate_partition_path(&row)?
+ };
+
+ let writer = DataFileWriter::new(
+ self.table.file_io().clone(),
+ self.table.location().to_string(),
+ partition_path,
+ bucket,
+ self.schema_id,
+ self.target_file_size,
+ self.file_compression.clone(),
+ self.file_compression_zstd_level,
+ self.write_buffer_size,
+ );
+
+ self.partition_writers
+ .insert((partition_bytes, bucket), writer);
+ Ok(())
+ }
+}
+
+/// Internal writer that produces parquet data files for a single (partition,
bucket).
+///
+/// Batches are accumulated into a single `FormatFileWriter` that streams
directly
+/// to storage. Call `prepare_commit()` to finalize and collect file metadata.
+struct DataFileWriter {
+ file_io: FileIO,
+ table_location: String,
+ partition_path: String,
+ bucket: i32,
+ schema_id: i64,
+ target_file_size: i64,
+ file_compression: String,
+ file_compression_zstd_level: i32,
+ write_buffer_size: i64,
+ written_files: Vec<DataFileMeta>,
+ /// Background file close tasks spawned during rolling.
+ in_flight_closes: JoinSet<Result<DataFileMeta>>,
+ /// Current open format writer, lazily created on first write.
+ current_writer: Option<Box<dyn FormatFileWriter>>,
+ current_file_name: Option<String>,
+ current_row_count: i64,
+}
+
+impl DataFileWriter {
+ #[allow(clippy::too_many_arguments)]
+ fn new(
+ file_io: FileIO,
+ table_location: String,
+ partition_path: String,
+ bucket: i32,
+ schema_id: i64,
+ target_file_size: i64,
+ file_compression: String,
+ file_compression_zstd_level: i32,
+ write_buffer_size: i64,
+ ) -> Self {
+ Self {
+ file_io,
+ table_location,
+ partition_path,
+ bucket,
+ schema_id,
+ target_file_size,
+ file_compression,
+ file_compression_zstd_level,
+ write_buffer_size,
+ written_files: Vec::new(),
+ in_flight_closes: JoinSet::new(),
+ current_writer: None,
+ current_file_name: None,
+ current_row_count: 0,
+ }
+ }
+
+ /// Write a RecordBatch. Rolls to a new file when target size is reached.
+ async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ if batch.num_rows() == 0 {
+ return Ok(());
+ }
+
+ if self.current_writer.is_none() {
+ self.open_new_file(batch.schema()).await?;
+ }
+
+ self.current_row_count += batch.num_rows() as i64;
+ self.current_writer.as_mut().unwrap().write(batch).await?;
+
+ // Roll to a new file if target size is reached — close in background
+ if self.current_writer.as_ref().unwrap().num_bytes() as i64 >=
self.target_file_size {
+ self.roll_file();
+ }
+
+ // Flush row group if in-progress buffer exceeds write_buffer_size
+ if let Some(w) = self.current_writer.as_mut() {
+ if w.in_progress_size() as i64 >= self.write_buffer_size {
+ w.flush().await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn open_new_file(&mut self, schema: arrow_schema::SchemaRef) ->
Result<()> {
+ let file_name = format!(
+ "data-{}-{}.parquet",
+ uuid::Uuid::new_v4(),
+ self.written_files.len()
+ );
+
+ let bucket_dir = if self.partition_path.is_empty() {
+ format!("{}/bucket-{}", self.table_location, self.bucket)
+ } else {
+ format!(
+ "{}/{}/bucket-{}",
+ self.table_location, self.partition_path, self.bucket
+ )
+ };
+ self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
+
+ let file_path = format!("{}/{}", bucket_dir, file_name);
+ let output = self.file_io.new_output(&file_path)?;
+ let writer = create_format_writer(
+ &output,
+ schema,
+ &self.file_compression,
+ self.file_compression_zstd_level,
+ )
+ .await?;
+ self.current_writer = Some(writer);
+ self.current_file_name = Some(file_name);
+ self.current_row_count = 0;
+ Ok(())
+ }
+
+ /// Close the current file writer and record the file metadata.
+ async fn close_current_file(&mut self) -> Result<()> {
+ let writer = match self.current_writer.take() {
+ Some(w) => w,
+ None => return Ok(()),
+ };
+ let file_name = self.current_file_name.take().unwrap();
+
+ let row_count = self.current_row_count;
+ self.current_row_count = 0;
+ let file_size = writer.close().await? as i64;
+
+ let meta = Self::build_meta(file_name, file_size, row_count,
self.schema_id);
+ self.written_files.push(meta);
+ Ok(())
+ }
+
+ /// Spawn the current writer's close in the background for non-blocking
rolling.
+ fn roll_file(&mut self) {
+ let writer = match self.current_writer.take() {
+ Some(w) => w,
+ None => return,
+ };
+ let file_name = self.current_file_name.take().unwrap();
+ let row_count = self.current_row_count;
+ self.current_row_count = 0;
+ let schema_id = self.schema_id;
+
+ self.in_flight_closes.spawn(async move {
+ let file_size = writer.close().await? as i64;
+ Ok(Self::build_meta(file_name, file_size, row_count, schema_id))
+ });
+ }
+
+ /// Close the current writer and return all written file metadata.
+ async fn prepare_commit(&mut self) -> Result<Vec<DataFileMeta>> {
+ self.close_current_file().await?;
+ while let Some(result) = self.in_flight_closes.join_next().await {
+ let meta = result.map_err(|e| crate::Error::DataInvalid {
+ message: format!("Background file close task panicked: {e}"),
+ source: None,
+ })??;
+ self.written_files.push(meta);
+ }
+ Ok(std::mem::take(&mut self.written_files))
+ }
+
+ fn build_meta(
+ file_name: String,
+ file_size: i64,
+ row_count: i64,
+ schema_id: i64,
+ ) -> DataFileMeta {
+ DataFileMeta {
+ file_name,
+ file_size,
+ row_count,
+ min_key: EMPTY_SERIALIZED_ROW.clone(),
+ max_key: EMPTY_SERIALIZED_ROW.clone(),
+ key_stats: BinaryTableStats::new(
+ EMPTY_SERIALIZED_ROW.clone(),
+ EMPTY_SERIALIZED_ROW.clone(),
+ vec![],
+ ),
+ value_stats: BinaryTableStats::new(
+ EMPTY_SERIALIZED_ROW.clone(),
+ EMPTY_SERIALIZED_ROW.clone(),
+ vec![],
+ ),
+ min_sequence_number: 0,
+ max_sequence_number: 0,
+ schema_id,
+ level: 0,
+ extra_files: vec![],
+ creation_time: Some(Utc::now()),
+ delete_row_count: Some(0),
+ embedded_index: None,
+ file_source: Some(0), // APPEND
+ value_stats_cols: Some(vec![]),
+ external_path: None,
+ first_row_id: None,
+ write_cols: None,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::catalog::Identifier;
+ use crate::io::FileIOBuilder;
+ use crate::spec::{
+ DataType, DecimalType, IntType, LocalZonedTimestampType, Schema,
TableSchema,
+ TimestampType, VarCharType,
+ };
+ use crate::table::{SnapshotManager, TableCommit};
+ use arrow_array::Int32Array;
+ use arrow_schema::{
+ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
TimeUnit,
+ };
+ use std::sync::Arc;
+
+ fn test_file_io() -> FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+ }
+
+ fn test_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_partitioned_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("pt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .partition_keys(["pt"])
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_table(file_io: &FileIO, table_path: &str) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_table"),
+ table_path.to_string(),
+ test_schema(),
+ None,
+ )
+ }
+
+ fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_table"),
+ table_path.to_string(),
+ test_partitioned_schema(),
+ None,
+ )
+ }
+
+ async fn setup_dirs(file_io: &FileIO, table_path: &str) {
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+ }
+
+ fn make_batch(ids: Vec<i32>, values: Vec<i32>) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ ],
+ )
+ .unwrap()
+ }
+
+ fn make_partitioned_batch(pts: Vec<&str>, ids: Vec<i32>) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("pt", ArrowDataType::Utf8, false),
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arrow_array::StringArray::from(pts)),
+ Arc::new(Int32Array::from(ids)),
+ ],
+ )
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_write_and_commit() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ let batch = make_batch(vec![1, 2, 3], vec![10, 20, 30]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ assert_eq!(messages[0].bucket, 0);
+ assert_eq!(messages[0].new_files.len(), 1);
+ assert_eq!(messages[0].new_files[0].row_count, 3);
+
+ // Commit and verify snapshot
+ let commit = TableCommit::new(table, "test-user".to_string());
+ commit.commit(messages).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 1);
+ assert_eq!(snapshot.total_record_count(), Some(3));
+ }
+
+ #[tokio::test]
+ async fn test_write_partitioned() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_partitioned";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_partitioned_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ let batch = make_partitioned_batch(vec!["a", "b", "a"], vec![1, 2, 3]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ // Should have 2 commit messages (one per partition)
+ assert_eq!(messages.len(), 2);
+
+ let total_rows: i64 = messages
+ .iter()
+ .flat_map(|m| &m.new_files)
+ .map(|f| f.row_count)
+ .sum();
+ assert_eq!(total_rows, 3);
+
+ // Commit and verify
+ let commit = TableCommit::new(table, "test-user".to_string());
+ commit.commit(messages).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 1);
+ assert_eq!(snapshot.total_record_count(), Some(3));
+ }
+
+ #[tokio::test]
+ async fn test_write_empty_batch() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_empty";
+ let table = test_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ let batch = make_batch(vec![], vec![]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert!(messages.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_prepare_commit_reusable() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_reuse";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ // First write + prepare_commit
+ table_write
+ .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+ .await
+ .unwrap();
+ let messages1 = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages1.len(), 1);
+ assert_eq!(messages1[0].new_files[0].row_count, 2);
+
+ // Second write + prepare_commit (reuse)
+ table_write
+ .write_arrow_batch(&make_batch(vec![3, 4, 5], vec![30, 40, 50]))
+ .await
+ .unwrap();
+ let messages2 = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages2.len(), 1);
+ assert_eq!(messages2[0].new_files[0].row_count, 3);
+
+ // Empty prepare_commit is fine
+ let messages3 = table_write.prepare_commit().await.unwrap();
+ assert!(messages3.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_write_multiple_batches() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_multi";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ table_write
+ .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+ .await
+ .unwrap();
+ table_write
+ .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40]))
+ .await
+ .unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ // Multiple batches accumulate into a single file
+ assert_eq!(messages[0].new_files.len(), 1);
+
+ let total_rows: i64 = messages[0].new_files.iter().map(|f|
f.row_count).sum();
+ assert_eq!(total_rows, 4);
+ }
+
+ fn test_bucketed_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .option("bucket", "4")
+ .option("bucket-key", "id")
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_bucketed_table(file_io: &FileIO, table_path: &str) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_table"),
+ table_path.to_string(),
+ test_bucketed_schema(),
+ None,
+ )
+ }
+
+ /// Build a batch where the bucket-key column ("id") is nullable.
+ fn make_nullable_id_batch(ids: Vec<Option<i32>>, values: Vec<i32>) ->
RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int32, true),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ ],
+ )
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_write_bucketed_with_null_bucket_key() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_null_bk";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_bucketed_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ // Row with NULL bucket key should not panic
+ let batch = make_nullable_id_batch(vec![None, Some(1), None], vec![10,
20, 30]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ let total_rows: i64 = messages
+ .iter()
+ .flat_map(|m| &m.new_files)
+ .map(|f| f.row_count)
+ .sum();
+ assert_eq!(total_rows, 3);
+ }
+
+ #[tokio::test]
+ async fn test_null_bucket_key_routes_consistently() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_null_bk_consistent";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_bucketed_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ // Two NULLs should land in the same bucket
+ let batch = make_nullable_id_batch(vec![None, None], vec![10, 20]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ // Both NULL-key rows must be in the same (partition, bucket) group
+ let null_bucket_rows: i64 = messages
+ .iter()
+ .flat_map(|m| &m.new_files)
+ .map(|f| f.row_count)
+ .sum();
+ assert_eq!(null_bucket_rows, 2);
+ // All NULL-key rows go to exactly one bucket
+ assert_eq!(messages.len(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_null_vs_nonnull_bucket_key_differ() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_null_vs_nonnull";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_bucketed_table(&file_io, table_path);
+
+ // Compute bucket for NULL key
+ let fields = table.schema().fields().to_vec();
+ let tw = TableWrite::new(&table).unwrap();
+
+ let batch_null = make_nullable_id_batch(vec![None], vec![10]);
+ let (_, bucket_null) = tw
+ .extract_partition_bucket(&batch_null, 0, &fields)
+ .unwrap();
+
+ // Compute bucket for key = 0 (the value a null field's fixed bytes
happen to be)
+ let batch_zero = make_nullable_id_batch(vec![Some(0)], vec![20]);
+ let (_, bucket_zero) = tw
+ .extract_partition_bucket(&batch_zero, 0, &fields)
+ .unwrap();
+
+ // A NULL bucket key must produce a BinaryRow with the null bit set,
+ // which hashes differently from a non-null 0 value.
+ // (With 4 buckets they could theoretically collide, but the hash
codes differ.)
+ let mut builder_null = BinaryRowBuilder::new(1);
+ builder_null.set_null_at(0);
+ let hash_null = builder_null.build().hash_code();
+
+ let mut builder_zero = BinaryRowBuilder::new(1);
+ builder_zero.write_int(0, 0);
+ let hash_zero = builder_zero.build().hash_code();
+
+ assert_ne!(hash_null, hash_zero, "NULL and 0 should hash differently");
+ // If hashes differ, buckets should differ (with 4 buckets, very
likely)
+ // But we verify the hash difference is the important invariant
+ let _ = (bucket_null, bucket_zero);
+ }
+
+ /// Mirrors Java's testUnCompactDecimalAndTimestampNullValueBucketNumber.
+ /// Non-compact types (Decimal(38,18), LocalZonedTimestamp(6),
Timestamp(6))
+ /// use variable-length encoding in BinaryRow — NULL handling must still
work.
+ #[tokio::test]
+ async fn test_non_compact_null_bucket_key() {
+ let file_io = test_file_io();
+
+ let bucket_cols = ["d", "ltz", "ntz"];
+ let total_buckets = 16;
+
+ for bucket_col in &bucket_cols {
+ let table_path = format!("memory:/test_null_bk_{bucket_col}");
+ setup_dirs(&file_io, &table_path).await;
+
+ let schema = Schema::builder()
+ .column("d", DataType::Decimal(DecimalType::new(38,
18).unwrap()))
+ .column(
+ "ltz",
+
DataType::LocalZonedTimestamp(LocalZonedTimestampType::new(6).unwrap()),
+ )
+ .column("ntz",
DataType::Timestamp(TimestampType::new(6).unwrap()))
+ .column("k", DataType::Int(IntType::new()))
+ .option("bucket", total_buckets.to_string())
+ .option("bucket-key", *bucket_col)
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+ let table = Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_table"),
+ table_path.to_string(),
+ table_schema,
+ None,
+ );
+
+ let tw = TableWrite::new(&table).unwrap();
+ let fields = table.schema().fields().to_vec();
+
+ // Build a batch: d=NULL, ltz=NULL, ntz=NULL, k=1
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("d", ArrowDataType::Decimal128(38, 18), true),
+ ArrowField::new(
+ "ltz",
+ ArrowDataType::Timestamp(TimeUnit::Microsecond,
Some("UTC".into())),
+ true,
+ ),
+ ArrowField::new(
+ "ntz",
+ ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ ArrowField::new("k", ArrowDataType::Int32, false),
+ ]));
+ let batch = RecordBatch::try_new(
+ arrow_schema,
+ vec![
+ Arc::new(
+ arrow_array::Decimal128Array::from(vec![None::<i128>])
+ .with_precision_and_scale(38, 18)
+ .unwrap(),
+ ),
+ Arc::new(
+
arrow_array::TimestampMicrosecondArray::from(vec![None::<i64>])
+ .with_timezone("UTC"),
+ ),
+ Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
+ None::<i64>,
+ ])),
+ Arc::new(Int32Array::from(vec![1])),
+ ],
+ )
+ .unwrap();
+
+ let (_, bucket) = tw.extract_partition_bucket(&batch, 0,
&fields).unwrap();
+
+ // Expected: BinaryRow with 1 field, null at pos 0
+ let mut builder = BinaryRowBuilder::new(1);
+ builder.set_null_at(0);
+ let expected_bucket = (builder.build().hash_code() %
total_buckets).abs();
+
+ assert_eq!(
+ bucket, expected_bucket,
+ "NULL bucket-key '{bucket_col}' should produce bucket
{expected_bucket}, got {bucket}"
+ );
+ }
+ }
+
+ #[tokio::test]
+ async fn test_write_rolling_on_target_file_size() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_table_write_rolling";
+ setup_dirs(&file_io, table_path).await;
+
+ // Create table with very small target-file-size to trigger rolling
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .option("target-file-size", "1b")
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+ let table = Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_table"),
+ table_path.to_string(),
+ table_schema,
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table).unwrap();
+
+ // Write multiple batches — each should roll to a new file
+ table_write
+ .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+ .await
+ .unwrap();
+ table_write
+ .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40]))
+ .await
+ .unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ // With 1-byte target, each batch should produce a separate file
+ assert_eq!(messages[0].new_files.len(), 2);
+
+ let total_rows: i64 = messages[0].new_files.iter().map(|f|
f.row_count).sum();
+ assert_eq!(total_rows, 4);
+ }
+}
diff --git a/crates/paimon/src/table/write_builder.rs
b/crates/paimon/src/table/write_builder.rs
index d6458cf..45db333 100644
--- a/crates/paimon/src/table/write_builder.rs
+++ b/crates/paimon/src/table/write_builder.rs
@@ -19,9 +19,7 @@
//!
//! Reference: [pypaimon
WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py)
-use crate::spec::Datum;
-use crate::table::{Table, TableCommit};
-use std::collections::HashMap;
+use crate::table::{Table, TableCommit, TableWrite};
use uuid::Uuid;
/// Builder for creating table writers and committers.
@@ -31,7 +29,6 @@ use uuid::Uuid;
pub struct WriteBuilder<'a> {
table: &'a Table,
commit_user: String,
- overwrite_partition: Option<HashMap<String, Datum>>,
}
impl<'a> WriteBuilder<'a> {
@@ -39,25 +36,16 @@ impl<'a> WriteBuilder<'a> {
Self {
table,
commit_user: Uuid::new_v4().to_string(),
- overwrite_partition: None,
}
}
- /// Set overwrite mode. If `partition` is None, overwrites the entire
table.
- /// If `partition` is Some, overwrites only the specified partition.
- pub fn overwrite(&mut self, partition: Option<HashMap<String, Datum>>) ->
&mut Self {
- self.overwrite_partition = Some(partition.unwrap_or_default());
- self
- }
-
/// Create a new TableCommit for committing write results.
pub fn new_commit(&self) -> TableCommit {
- TableCommit::new(
- self.table.clone(),
- self.commit_user.clone(),
- self.overwrite_partition.clone(),
- )
+ TableCommit::new(self.table.clone(), self.commit_user.clone())
}
- // TODO: pub fn new_write(&self) -> TableWrite { ... }
+ /// Create a new TableWrite for writing Arrow data.
+ pub fn new_write(&self) -> crate::Result<TableWrite> {
+ TableWrite::new(self.table)
+ }
}