This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 13e370a feat: support input changelog for primary key writes (#318)
13e370a is described below
commit 13e370ae5cb450351dcbdabb60d3ea7cc79847ab
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 20 16:13:00 2026 +0800
feat: support input changelog for primary key writes (#318)
---
crates/integrations/datafusion/tests/pk_tables.rs | 32 +-
crates/paimon/src/spec/core_options.rs | 166 ++++++-
crates/paimon/src/table/commit_message.rs | 3 +
crates/paimon/src/table/kv_file_writer.rs | 260 ++++++++---
crates/paimon/src/table/mod.rs | 1 +
crates/paimon/src/table/prepared_files.rs | 33 ++
crates/paimon/src/table/table_commit.rs | 126 +++++-
crates/paimon/src/table/table_write.rs | 512 +++++++++++++++++++++-
8 files changed, 1032 insertions(+), 101 deletions(-)
diff --git a/crates/integrations/datafusion/tests/pk_tables.rs
b/crates/integrations/datafusion/tests/pk_tables.rs
index fa0bd6f..bf1a317 100644
--- a/crates/integrations/datafusion/tests/pk_tables.rs
+++ b/crates/integrations/datafusion/tests/pk_tables.rs
@@ -1460,9 +1460,9 @@ async fn test_pk_partitioned_multi_bucket() {
// ======================= Error Cases =======================
-/// PK table with changelog-producer=input should be rejected.
+/// PK table with changelog-producer=input should write through DataFusion SQL.
#[tokio::test]
-async fn test_pk_reject_changelog_producer_input() {
+async fn test_pk_input_changelog_write_read() {
let (_tmp, sql_context) = setup_sql_context().await;
sql_context
@@ -1475,18 +1475,24 @@ async fn test_pk_reject_changelog_producer_input() {
.await
.unwrap();
- let result = sql_context
- .sql("INSERT INTO paimon.test_db.t_changelog VALUES (1, 'alice')")
- .await;
+ sql_context
+ .sql(
+ "INSERT INTO paimon.test_db.t_changelog VALUES
+ (1, 'alice'), (1, 'bob'), (2, 'carol')",
+ )
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
- let is_err = match result {
- Err(_) => true,
- Ok(df) => df.collect().await.is_err(),
- };
- assert!(
- is_err,
- "PK table with changelog-producer=input should reject writes"
- );
+ let rows = collect_id_name(
+ &sql_context,
+ "SELECT id, name FROM paimon.test_db.t_changelog ORDER BY id",
+ )
+ .await;
+
+ assert_eq!(rows, vec![(1, "bob".to_string()), (2, "carol".to_string())]);
}
// ======================= String Primary Key =======================
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index bafad0a..aafdf52 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -40,6 +40,10 @@ const COMMIT_MAX_RETRY_WAIT_OPTION: &str =
"commit.max-retry-wait";
const FILE_COMPRESSION_OPTION: &str = "file.compression";
const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level";
const FILE_FORMAT_OPTION: &str = "file.format";
+const CHANGELOG_FILE_PREFIX_OPTION: &str = "changelog-file.prefix";
+const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format";
+const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression";
+const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode";
const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
const SEQUENCE_FIELD_OPTION: &str = "sequence.field";
@@ -55,6 +59,7 @@ pub const SCAN_VERSION_OPTION: &str = "scan.version";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
+const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-";
const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str =
"dynamic-bucket.target-row-num";
@@ -75,6 +80,32 @@ pub enum MergeEngine {
FirstRow,
}
+/// Changelog producer for table writes.
+///
+/// Reference: Java `CoreOptions.ChangelogProducer`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ChangelogProducer {
+ /// No changelog file.
+ None,
+ /// Double write input rows to changelog files.
+ Input,
+ /// Generate changelog files during full compaction.
+ FullCompaction,
+ /// Generate changelog files through lookup compaction.
+ Lookup,
+}
+
+impl ChangelogProducer {
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ Self::None => "none",
+ Self::Input => "input",
+ Self::FullCompaction => "full-compaction",
+ Self::Lookup => "lookup",
+ }
+ }
+}
+
/// Format the bucket directory name for a given bucket number.
/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise
`"bucket-{N}"`.
pub fn bucket_dir_name(bucket: i32) -> String {
@@ -138,7 +169,7 @@ impl<'a> CoreOptions<'a> {
}
}
- /// Changelog producer setting. Default is "none".
+ /// Raw changelog producer setting. Default is `"none"`.
pub fn changelog_producer(&self) -> &str {
self.options
.get(CHANGELOG_PRODUCER_OPTION)
@@ -146,6 +177,22 @@ impl<'a> CoreOptions<'a> {
.unwrap_or("none")
}
+ /// Typed changelog producer setting. Default is `None`.
+ pub fn try_changelog_producer(&self) -> crate::Result<ChangelogProducer> {
+ match self.options.get(CHANGELOG_PRODUCER_OPTION) {
+ None => Ok(ChangelogProducer::None),
+ Some(v) => match v.to_ascii_lowercase().as_str() {
+ "none" => Ok(ChangelogProducer::None),
+ "input" => Ok(ChangelogProducer::Input),
+ "full-compaction" => Ok(ChangelogProducer::FullCompaction),
+ "lookup" => Ok(ChangelogProducer::Lookup),
+ other => Err(crate::Error::Unsupported {
+ message: format!("Unsupported changelog-producer:
'{other}'"),
+ }),
+ },
+ }
+ }
+
/// The `rowkind.field` option: a user column whose value encodes the row
kind.
pub fn rowkind_field(&self) -> Option<&str> {
self.options.get(ROWKIND_FIELD_OPTION).map(String::as_str)
@@ -363,6 +410,43 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(1)
}
+ /// File name prefix for changelog files. Default is `"changelog-"`.
+ pub fn changelog_file_prefix(&self) -> &str {
+ self.options
+ .get(CHANGELOG_FILE_PREFIX_OPTION)
+ .map(String::as_str)
+ .unwrap_or(DEFAULT_CHANGELOG_FILE_PREFIX)
+ }
+
+ /// Effective file format for changelog files.
+ ///
+ /// When `changelog-file.format` is not configured, Java Paimon falls back
+ /// to the table `file.format`.
+ pub fn changelog_file_format(&self) -> &str {
+ self.options
+ .get(CHANGELOG_FILE_FORMAT_OPTION)
+ .map(String::as_str)
+ .unwrap_or_else(|| self.file_format())
+ }
+
+ /// Effective compression codec for changelog files.
+ ///
+ /// When `changelog-file.compression` is not configured, Java Paimon falls
+ /// back to the table `file.compression`.
+ pub fn changelog_file_compression(&self) -> &str {
+ self.options
+ .get(CHANGELOG_FILE_COMPRESSION_OPTION)
+ .map(String::as_str)
+ .unwrap_or_else(|| self.file_compression())
+ }
+
+ /// Metadata stats collection mode for changelog files, if configured.
+ pub fn changelog_file_stats_mode(&self) -> Option<&str> {
+ self.options
+ .get(CHANGELOG_FILE_STATS_MODE_OPTION)
+ .map(String::as_str)
+ }
+
/// Parquet writer in-progress buffer size limit. Default is 256MB.
/// When the buffered data exceeds this, the writer flushes the current
row group.
pub fn write_parquet_buffer_size(&self) -> i64 {
@@ -546,6 +630,86 @@ mod tests {
assert_eq!(core.merge_engine().unwrap(), MergeEngine::PartialUpdate);
}
+ #[test]
+ fn test_changelog_producer_defaults_to_none() {
+ let options = HashMap::new();
+ let core = CoreOptions::new(&options);
+
+ assert_eq!(core.changelog_producer(), "none");
+ assert_eq!(
+ core.try_changelog_producer().unwrap(),
+ ChangelogProducer::None
+ );
+ }
+
+ #[test]
+ fn test_changelog_producer_accepts_known_values() {
+ for (value, expected) in [
+ ("none", ChangelogProducer::None),
+ ("input", ChangelogProducer::Input),
+ ("full-compaction", ChangelogProducer::FullCompaction),
+ ("lookup", ChangelogProducer::Lookup),
+ ("INPUT", ChangelogProducer::Input),
+ ] {
+ let options =
HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(), value.into())]);
+ let core = CoreOptions::new(&options);
+
+ assert_eq!(core.try_changelog_producer().unwrap(), expected);
+ }
+ }
+
+ #[test]
+ fn test_changelog_producer_rejects_unknown_values() {
+ let options = HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(),
"other".into())]);
+ let core = CoreOptions::new(&options);
+
+ let err = core
+ .try_changelog_producer()
+ .expect_err("unknown producer should fail");
+ assert!(
+ matches!(err, crate::Error::Unsupported { message } if
message.contains("Unsupported changelog-producer"))
+ );
+ }
+
+ #[test]
+ fn test_changelog_file_options_defaults_and_overrides() {
+ let default_options = HashMap::from([
+ (FILE_FORMAT_OPTION.to_string(), "avro".to_string()),
+ (FILE_COMPRESSION_OPTION.to_string(), "snappy".to_string()),
+ ]);
+ let default_core = CoreOptions::new(&default_options);
+
+ assert_eq!(default_core.changelog_file_prefix(), "changelog-");
+ assert_eq!(default_core.changelog_file_format(), "avro");
+ assert_eq!(default_core.changelog_file_compression(), "snappy");
+ assert_eq!(default_core.changelog_file_stats_mode(), None);
+
+ let custom_options = HashMap::from([
+ (
+ CHANGELOG_FILE_PREFIX_OPTION.to_string(),
+ "custom-".to_string(),
+ ),
+ (
+ CHANGELOG_FILE_FORMAT_OPTION.to_string(),
+ "parquet".to_string(),
+ ),
+ (
+ CHANGELOG_FILE_COMPRESSION_OPTION.to_string(),
+ "zstd".to_string(),
+ ),
+ (
+ CHANGELOG_FILE_STATS_MODE_OPTION.to_string(),
+ "counts".to_string(),
+ ),
+ ]);
+ let custom_core = CoreOptions::new(&custom_options);
+
+ assert_eq!(custom_core.changelog_file_prefix(), "custom-");
+ assert_eq!(custom_core.changelog_file_format(), "parquet");
+ assert_eq!(custom_core.changelog_file_compression(), "zstd");
+ assert_eq!(custom_core.changelog_file_stats_mode(), Some("counts"));
+ }
+
#[test]
fn test_commit_options_defaults() {
let options = HashMap::new();
diff --git a/crates/paimon/src/table/commit_message.rs
b/crates/paimon/src/table/commit_message.rs
index ac7efb9..88d3533 100644
--- a/crates/paimon/src/table/commit_message.rs
+++ b/crates/paimon/src/table/commit_message.rs
@@ -29,6 +29,8 @@ pub struct CommitMessage {
pub bucket: i32,
/// New data files to be added.
pub new_files: Vec<DataFileMeta>,
+ /// New changelog files to be added.
+ pub new_changelog_files: Vec<DataFileMeta>,
/// New index files to be added (used by dynamic bucket mode).
pub new_index_files: Vec<IndexFileMeta>,
/// Files to be deleted (copy-on-write rewrite: old files replaced by
new_files).
@@ -41,6 +43,7 @@ impl CommitMessage {
partition,
bucket,
new_files,
+ new_changelog_files: Vec::new(),
new_index_files: Vec::new(),
deleted_files: Vec::new(),
}
diff --git a/crates/paimon/src/table/kv_file_writer.rs
b/crates/paimon/src/table/kv_file_writer.rs
index a6d7180..55f1d6d 100644
--- a/crates/paimon/src/table/kv_file_writer.rs
+++ b/crates/paimon/src/table/kv_file_writer.rs
@@ -31,10 +31,12 @@ use crate::io::FileIO;
use crate::spec::stats::{compute_column_stats, BinaryTableStats};
use crate::spec::{
extract_datum_from_arrow, BinaryRowBuilder, DataFileMeta, DataType,
MergeEngine,
- PartialUpdateConfig, EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME,
VALUE_KIND_FIELD_NAME,
+ PartialUpdateConfig, RowKind, EMPTY_SERIALIZED_ROW,
SEQUENCE_NUMBER_FIELD_NAME,
+ VALUE_KIND_FIELD_NAME,
};
+use crate::table::prepared_files::PreparedFiles;
use crate::Result;
-use arrow_array::{Int64Array, Int8Array, RecordBatch};
+use arrow_array::{Array, Int64Array, Int8Array, RecordBatch, UInt32Array};
use arrow_ord::sort::{lexsort_to_indices, SortColumn, SortOptions};
use arrow_row::{RowConverter, SortField};
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as
ArrowSchema};
@@ -55,6 +57,8 @@ pub(crate) struct KeyValueFileWriter {
buffer_bytes: usize,
/// Completed file metadata.
written_files: Vec<DataFileMeta>,
+ /// Completed changelog file metadata.
+ written_changelog_files: Vec<DataFileMeta>,
}
/// Configuration for [`KeyValueFileWriter`], grouping file-location, schema,
@@ -70,6 +74,10 @@ pub(crate) struct KeyValueWriteConfig {
pub file_compression_zstd_level: i32,
pub write_buffer_size: i64,
pub file_format: String,
+ pub input_changelog: bool,
+ pub changelog_file_prefix: String,
+ pub changelog_file_compression: String,
+ pub changelog_file_format: String,
/// Primary key column indices in the user schema.
pub primary_key_indices: Vec<usize>,
/// Paimon DataTypes for each primary key column (same order as
primary_key_indices).
@@ -81,6 +89,16 @@ pub(crate) struct KeyValueWriteConfig {
pub deletion_vectors_enabled: bool,
}
+struct IndexedFileWrite<'a> {
+ file_prefix: &'a str,
+ file_ordinal: usize,
+ file_format: &'a str,
+ file_compression: &'a str,
+ min_sequence_number: i64,
+ max_sequence_number: i64,
+ delete_row_count: i64,
+}
+
impl KeyValueFileWriter {
pub(crate) fn new(
file_io: FileIO,
@@ -108,6 +126,7 @@ impl KeyValueFileWriter {
buffer: Vec::new(),
buffer_bytes: 0,
written_files: Vec::new(),
+ written_changelog_files: Vec::new(),
})
}
@@ -208,23 +227,82 @@ impl KeyValueFileWriter {
// FirstRow → keep first row per key group (lowest seq)
// PartialUpdate → keep all rows for read-side field-wise merge
let selected_indices = self.select_flush_indices(&combined,
&sorted_indices)?;
- let selected_num_rows = selected_indices.len();
+ let selected_u32 = UInt32Array::from(selected_indices);
- // Extract min_key / max_key from selected endpoints.
- let first_row = selected_indices[0] as usize;
- let last_row = selected_indices[selected_num_rows - 1] as usize;
- let min_key = self.extract_key_binary_row(&combined, first_row)?;
- let max_key = self.extract_key_binary_row(&combined, last_row)?;
+ let data_delete_row_count = Self::indexed_delete_row_count(&combined,
&selected_u32)?;
+ let changelog_delete_row_count = if self.config.input_changelog {
+ Some(Self::indexed_delete_row_count(&combined, &sorted_indices)?)
+ } else {
+ None
+ };
- // Build physical schema and open writer.
- let physical_schema = build_physical_schema(&user_schema);
+ let data_file = self
+ .write_indexed_file(
+ &combined,
+ seq_array.as_ref(),
+ &selected_u32,
+ IndexedFileWrite {
+ file_prefix: "data-",
+ file_ordinal: self.written_files.len(),
+ file_format: &self.config.file_format,
+ file_compression: &self.config.file_compression,
+ min_sequence_number: start_seq,
+ max_sequence_number: end_seq,
+ delete_row_count: data_delete_row_count,
+ },
+ )
+ .await?;
+ self.written_files.push(data_file);
+
+ if let Some(delete_row_count) = changelog_delete_row_count {
+ let changelog_file = self
+ .write_indexed_file(
+ &combined,
+ seq_array.as_ref(),
+ &sorted_indices,
+ IndexedFileWrite {
+ file_prefix: &self.config.changelog_file_prefix,
+ file_ordinal: self.written_changelog_files.len(),
+ file_format: &self.config.changelog_file_format,
+ file_compression:
&self.config.changelog_file_compression,
+ min_sequence_number: start_seq,
+ max_sequence_number: end_seq,
+ delete_row_count,
+ },
+ )
+ .await?;
+ self.written_changelog_files.push(changelog_file);
+ }
+ Ok(())
+ }
+
+ async fn write_indexed_file(
+ &self,
+ batch: &RecordBatch,
+ seq_array: &dyn Array,
+ indices: &UInt32Array,
+ write: IndexedFileWrite<'_>,
+ ) -> Result<DataFileMeta> {
+ if indices.is_empty() {
+ return Err(crate::Error::DataInvalid {
+ message: "Cannot write an empty key-value data
file".to_string(),
+ source: None,
+ });
+ }
- // Open file writer.
+ let user_schema = batch.schema();
+ let first_row = indices.value(0) as usize;
+ let last_row = indices.value(indices.len() - 1) as usize;
+ let min_key = self.extract_key_binary_row(batch, first_row)?;
+ let max_key = self.extract_key_binary_row(batch, last_row)?;
+
+ let physical_schema = build_physical_schema(&user_schema);
let file_name = format!(
- "data-{}-{}.{}",
+ "{}{}-{}.{}",
+ write.file_prefix,
uuid::Uuid::new_v4(),
- self.written_files.len(),
- self.config.file_format,
+ write.file_ordinal,
+ write.file_format,
);
let bucket_dir = if self.config.partition_path.is_empty() {
format!(
@@ -238,44 +316,42 @@ impl KeyValueFileWriter {
)
};
self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
- let file_path = format!("{}/{}", bucket_dir, file_name);
+ let file_path = format!("{bucket_dir}/{file_name}");
let output = self.file_io.new_output(&file_path)?;
let mut writer = create_format_writer(
&output,
physical_schema.clone(),
- &self.config.file_compression,
+ write.file_compression,
self.config.file_compression_zstd_level,
None,
)
.await?;
- // Chunked write using selected indices.
- let selected_u32 = arrow_array::UInt32Array::from(selected_indices);
- for chunk_start in
(0..selected_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) {
- let chunk_len = Self::FLUSH_CHUNK_ROWS.min(selected_num_rows -
chunk_start);
- let chunk_indices = selected_u32.slice(chunk_start, chunk_len);
+ let vk_idx = batch
+ .schema()
+ .fields()
+ .iter()
+ .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
+
+ for chunk_start in (0..indices.len()).step_by(Self::FLUSH_CHUNK_ROWS) {
+ let chunk_len = Self::FLUSH_CHUNK_ROWS.min(indices.len() -
chunk_start);
+ let chunk_indices = indices.slice(chunk_start, chunk_len);
- let mut physical_columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::new();
- // Sequence numbers for this chunk.
+ let mut physical_columns: Vec<Arc<dyn Array>> = Vec::new();
physical_columns.push(
- arrow_select::take::take(seq_array.as_ref(), &chunk_indices,
None).map_err(
- |e| crate::Error::DataInvalid {
+ arrow_select::take::take(seq_array, &chunk_indices,
None).map_err(|e| {
+ crate::Error::DataInvalid {
message: format!("Failed to reorder sequence numbers:
{e}"),
source: None,
- },
- )?,
+ }
+ })?,
);
- // Value kind column — resolve from batch schema.
- let vk_idx = combined
- .schema()
- .fields()
- .iter()
- .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
+
match vk_idx {
Some(vk_idx) => {
physical_columns.push(
arrow_select::take::take(
- combined.column(vk_idx).as_ref(),
+ batch.column(vk_idx).as_ref(),
&chunk_indices,
None,
)
@@ -286,21 +362,20 @@ impl KeyValueFileWriter {
);
}
None => {
- // All rows are INSERT (value_kind = 0).
physical_columns.push(Arc::new(Int8Array::from(vec![0i8;
chunk_len])));
}
}
- // All user columns (skip _VALUE_KIND if present — already handled
above).
- for idx in 0..combined.num_columns() {
+
+ for idx in 0..batch.num_columns() {
if Some(idx) == vk_idx {
continue;
}
physical_columns.push(
- arrow_select::take::take(combined.column(idx).as_ref(),
&chunk_indices, None)
+ arrow_select::take::take(batch.column(idx).as_ref(),
&chunk_indices, None)
.map_err(|e| crate::Error::DataInvalid {
- message: format!("Failed to reorder by sort indices:
{e}"),
- source: None,
- })?,
+ message: format!("Failed to reorder by sort
indices: {e}"),
+ source: None,
+ })?,
);
}
@@ -314,20 +389,20 @@ impl KeyValueFileWriter {
let file_size = writer.close().await? as i64;
- // Compute key_stats on selected output rows (not the raw combined
batch).
- let selected_key_columns: Vec<Arc<dyn arrow_array::Array>> = self
+ let key_columns: Vec<Arc<dyn Array>> = self
.config
.primary_key_indices
.iter()
.map(|&idx| {
- arrow_select::take::take(combined.column(idx).as_ref(),
&selected_u32, None)
- .map_err(|e| crate::Error::DataInvalid {
+ arrow_select::take::take(batch.column(idx).as_ref(), indices,
None).map_err(|e| {
+ crate::Error::DataInvalid {
message: format!("Failed to take key column for stats:
{e}"),
source: None,
- })
+ }
+ })
})
.collect::<Result<Vec<_>>>()?;
- let selected_key_batch = RecordBatch::try_new(
+ let key_batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(
self.config
.primary_key_indices
@@ -335,24 +410,23 @@ impl KeyValueFileWriter {
.map(|&idx| user_schema.field(idx).clone())
.collect::<Vec<_>>(),
)),
- selected_key_columns,
+ key_columns,
)
.map_err(|e| crate::Error::DataInvalid {
- message: format!("Failed to build selected key batch for stats:
{e}"),
+ message: format!("Failed to build key batch for stats: {e}"),
source: None,
})?;
let stats_col_indices: Vec<usize> =
(0..self.config.primary_key_indices.len()).collect();
let key_stats = compute_column_stats(
- &selected_key_batch,
+ &key_batch,
&stats_col_indices,
&self.config.primary_key_types,
)?;
- // Sequence numbers span the full assigned range.
- let meta = DataFileMeta {
+ Ok(DataFileMeta {
file_name,
file_size,
- row_count: selected_num_rows as i64,
+ row_count: indices.len() as i64,
min_key,
max_key,
key_stats,
@@ -361,22 +435,54 @@ impl KeyValueFileWriter {
EMPTY_SERIALIZED_ROW.clone(),
vec![],
),
- min_sequence_number: start_seq,
- max_sequence_number: end_seq,
+ min_sequence_number: write.min_sequence_number,
+ max_sequence_number: write.max_sequence_number,
schema_id: self.config.schema_id,
level: 0,
extra_files: vec![],
creation_time: Some(Utc::now()),
- delete_row_count: Some(0),
+ delete_row_count: Some(write.delete_row_count),
embedded_index: None,
file_source: Some(0), // FileSource.APPEND
value_stats_cols: Some(vec![]),
external_path: None,
first_row_id: None,
write_cols: None,
+ })
+ }
+
+ fn indexed_delete_row_count(batch: &RecordBatch, indices: &UInt32Array) ->
Result<i64> {
+ let Some(vk_idx) = batch
+ .schema()
+ .fields()
+ .iter()
+ .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME)
+ else {
+ return Ok(0);
};
- self.written_files.push(meta);
- Ok(())
+
+ let column = batch.column(vk_idx);
+ let Some(value_kinds) = column.as_any().downcast_ref::<Int8Array>()
else {
+ return Err(crate::Error::DataInvalid {
+ message: "_VALUE_KIND column must be Int8".to_string(),
+ source: None,
+ });
+ };
+
+ let mut delete_count = 0;
+ for idx in 0..indices.len() {
+ let row = indices.value(idx) as usize;
+ let value = if column.is_null(row) {
+ 0
+ } else {
+ value_kinds.value(row)
+ };
+ match RowKind::from_value(value)? {
+ RowKind::UpdateBefore | RowKind::Delete => delete_count += 1,
+ RowKind::Insert | RowKind::UpdateAfter => {}
+ }
+ }
+ Ok(delete_count)
}
/// Select output row indices from sorted inputs according to merge engine.
@@ -468,9 +574,12 @@ impl KeyValueFileWriter {
}
/// Flush remaining buffer and return all written file metadata.
- pub(crate) async fn prepare_commit(&mut self) -> Result<Vec<DataFileMeta>>
{
+ pub(crate) async fn prepare_commit(&mut self) -> Result<PreparedFiles> {
self.flush().await?;
- Ok(std::mem::take(&mut self.written_files))
+ Ok(PreparedFiles {
+ data_files: std::mem::take(&mut self.written_files),
+ changelog_files: std::mem::take(&mut self.written_changelog_files),
+ })
}
/// Extract primary key columns from a batch at a given row index into a
serialized BinaryRow.
@@ -539,6 +648,10 @@ mod tests {
file_compression_zstd_level: 0,
write_buffer_size: 1024,
file_format: "parquet".to_string(),
+ input_changelog: false,
+ changelog_file_prefix: "changelog-".to_string(),
+ changelog_file_compression: "none".to_string(),
+ changelog_file_format: "parquet".to_string(),
primary_key_indices: vec![0],
primary_key_types: vec![DataType::Int(IntType::new())],
sequence_field_indices: vec![1],
@@ -610,6 +723,33 @@ mod tests {
assert_eq!(selected, vec![0, 1]);
}
+ #[test]
+ fn test_indexed_delete_row_count_rejects_invalid_value_kind() {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)),
+ Arc::new(ArrowField::new(
+ VALUE_KIND_FIELD_NAME,
+ ArrowDataType::Int8,
+ false,
+ )),
+ ]));
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(vec![1])) as Arc<dyn
arrow_array::Array>,
+ Arc::new(Int8Array::from(vec![4])) as Arc<dyn
arrow_array::Array>,
+ ],
+ )
+ .unwrap();
+ let indices = UInt32Array::from(vec![0]);
+
+ let err = KeyValueFileWriter::indexed_delete_row_count(&batch,
&indices).unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::DataInvalid { message, .. } if
message.contains("Invalid RowKind value"))
+ );
+ }
+
#[test]
fn test_new_rejects_partial_update_with_deletion_vectors() {
let mut config = test_write_config(MergeEngine::PartialUpdate);
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index c00fa78..c425a9e 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -39,6 +39,7 @@ mod kv_file_reader;
mod kv_file_writer;
mod partition_filter;
mod postpone_file_writer;
+mod prepared_files;
mod read_builder;
pub mod referenced_files;
pub(crate) mod rest_env;
diff --git a/crates/paimon/src/table/prepared_files.rs
b/crates/paimon/src/table/prepared_files.rs
new file mode 100644
index 0000000..d2c89f9
--- /dev/null
+++ b/crates/paimon/src/table/prepared_files.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::spec::DataFileMeta;
+
+/// Files produced by closing a writer.
+pub(crate) struct PreparedFiles {
+ pub data_files: Vec<DataFileMeta>,
+ pub changelog_files: Vec<DataFileMeta>,
+}
+
+impl PreparedFiles {
+ pub(crate) fn data(data_files: Vec<DataFileMeta>) -> Self {
+ Self {
+ data_files,
+ changelog_files: Vec::new(),
+ }
+ }
+}
diff --git a/crates/paimon/src/table/table_commit.rs
b/crates/paimon/src/table/table_commit.rs
index 0de494e..664b654 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -98,9 +98,11 @@ impl TableCommit {
}
let entries = self.messages_to_entries(&commit_messages);
+ let changelog_entries =
self.messages_to_changelog_entries(&commit_messages);
let new_index_entries =
self.messages_to_index_entries(&commit_messages);
self.try_commit(CommitEntriesPlan::Direct {
entries,
+ changelog_entries,
new_index_entries,
})
.await
@@ -129,6 +131,9 @@ impl TableCommit {
let new_entries = self.messages_to_entries(&commit_messages);
let new_index_entries =
self.messages_to_index_entries(&commit_messages);
+ let has_new_data_entries = new_entries
+ .iter()
+ .any(|entry| *entry.kind() == FileKind::Add);
let partition_filter = if let Some(sp) = static_partitions {
let partition_keys = self.table.schema().partition_keys();
@@ -144,8 +149,10 @@ impl TableCommit {
} else {
Some(self.build_static_partition_predicate(&sp,
&partition_fields)?)
}
+ } else if !self.table.schema().partition_fields().is_empty() &&
!has_new_data_entries {
+ return Ok(());
} else {
- self.build_dynamic_partition_filter(&commit_messages)?
+ self.build_dynamic_partition_filter(&new_entries)?
};
self.try_commit(CommitEntriesPlan::Overwrite {
@@ -182,13 +189,13 @@ impl TableCommit {
Ok(PartitionFilter::from_predicate(combined, partition_fields))
}
- /// Build a dynamic partition filter from the partitions present in commit
messages.
+ /// Build a dynamic partition filter from the partitions present in new
data entries.
///
/// Returns `None` for unpartitioned tables (full table overwrite).
/// Uses `PartitionSet` for O(1) byte-level matching.
fn build_dynamic_partition_filter(
&self,
- commit_messages: &[CommitMessage],
+ entries: &[ManifestEntry],
) -> Result<Option<PartitionFilter>> {
let partition_fields = self.table.schema().partition_fields();
if partition_fields.is_empty() {
@@ -196,8 +203,10 @@ impl TableCommit {
}
let mut partition_bytes_set: HashSet<Vec<u8>> = HashSet::new();
- for msg in commit_messages {
- partition_bytes_set.insert(msg.partition.clone());
+ for entry in entries {
+ if *entry.kind() == FileKind::Add {
+ partition_bytes_set.insert(entry.partition().to_vec());
+ }
}
Ok(Some(PartitionFilter::from_partition_set(
@@ -298,7 +307,7 @@ impl TableCommit {
let latest_snapshot =
self.snapshot_manager.get_latest_snapshot().await?;
let resolved = self.resolve_commit(&plan, &latest_snapshot).await?;
- if resolved.entries.is_empty() {
+ if resolved.entries.is_empty() &&
resolved.changelog_entries.is_empty() {
break;
}
@@ -376,11 +385,15 @@ impl TableCommit {
let unique_id = uuid::Uuid::new_v4();
let base_manifest_list_name = format!("manifest-list-{unique_id}-0");
let delta_manifest_list_name = format!("manifest-list-{unique_id}-1");
+ let changelog_manifest_list_name =
format!("manifest-list-{unique_id}-2");
let new_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4());
+ let changelog_manifest_name = format!("manifest-{}-1",
uuid::Uuid::new_v4());
let base_manifest_list_path =
format!("{manifest_dir}/{base_manifest_list_name}");
let delta_manifest_list_path =
format!("{manifest_dir}/{delta_manifest_list_name}");
+ let changelog_manifest_list_path =
format!("{manifest_dir}/{changelog_manifest_list_name}");
let new_manifest_path = format!("{manifest_dir}/{new_manifest_name}");
+ let changelog_manifest_path =
format!("{manifest_dir}/{changelog_manifest_name}");
// Write manifest file
let new_manifest_file_meta = self
@@ -400,6 +413,32 @@ impl TableCommit {
)
.await?;
+ let changelog_record_count = if resolved.changelog_entries.is_empty() {
+ None
+ } else {
+ let changelog_manifest_file_meta = self
+ .write_manifest_file(
+ file_io,
+ &changelog_manifest_path,
+ &changelog_manifest_name,
+ &resolved.changelog_entries,
+ )
+ .await?;
+ ManifestList::write(
+ file_io,
+ &changelog_manifest_list_path,
+ &[changelog_manifest_file_meta],
+ )
+ .await?;
+ Some(
+ resolved
+ .changelog_entries
+ .iter()
+ .map(|entry| entry.file().row_count)
+ .sum(),
+ )
+ };
+
// Read existing manifests (base + delta from previous snapshot) and
write base manifest list
let mut total_record_count: i64 = 0;
let existing_manifest_files = if let Some(snap) = latest_snapshot {
@@ -441,6 +480,8 @@ impl TableCommit {
.time_millis(current_time_millis())
.total_record_count(Some(total_record_count))
.delta_record_count(Some(delta_record_count))
+ .changelog_manifest_list(changelog_record_count.map(|_|
changelog_manifest_list_name))
+ .changelog_record_count(changelog_record_count)
.next_row_id(next_row_id)
.index_manifest(resolved.index_manifest_name)
.build();
@@ -533,6 +574,7 @@ impl TableCommit {
match plan {
CommitEntriesPlan::Direct {
entries,
+ changelog_entries,
new_index_entries,
} => {
if self.row_tracking_enabled {
@@ -576,6 +618,7 @@ impl TableCommit {
Ok(ResolvedCommit {
entries: entries.clone(),
+ changelog_entries: changelog_entries.clone(),
kind,
index_manifest_name,
})
@@ -613,6 +656,7 @@ impl TableCommit {
Ok(ResolvedCommit {
entries,
+ changelog_entries: vec![],
kind: CommitKind::OVERWRITE,
index_manifest_name,
})
@@ -1031,6 +1075,25 @@ impl TableCommit {
.collect()
}
+ /// Convert commit messages to changelog manifest entries (ADD kind only).
+ fn messages_to_changelog_entries(&self, messages: &[CommitMessage]) ->
Vec<ManifestEntry> {
+ messages
+ .iter()
+ .flat_map(|msg| {
+ msg.new_changelog_files.iter().map(|file| {
+ ManifestEntry::new(
+ FileKind::Add,
+ msg.partition.clone(),
+ msg.bucket,
+ self.total_buckets,
+ file.clone(),
+ 0,
+ )
+ })
+ })
+ .collect()
+ }
+
/// Convert commit messages to index manifest entries (ADD kind).
fn messages_to_index_entries(&self, messages: &[CommitMessage]) ->
Vec<IndexManifestEntry> {
messages
@@ -1056,6 +1119,7 @@ enum CommitEntriesPlan {
/// rewrites, in which case `resolve_commit` auto-promotes to
`CommitKind::OVERWRITE`.
Direct {
entries: Vec<ManifestEntry>,
+ changelog_entries: Vec<ManifestEntry>,
new_index_entries: Vec<IndexManifestEntry>,
},
/// Overwrite with optional partition filter.
@@ -1069,6 +1133,7 @@ enum CommitEntriesPlan {
/// Fully resolved commit ready for writing.
struct ResolvedCommit {
entries: Vec<ManifestEntry>,
+ changelog_entries: Vec<ManifestEntry>,
kind: CommitKind,
index_manifest_name: Option<String>,
}
@@ -1371,6 +1436,35 @@ mod tests {
assert_eq!(snapshot.total_record_count(), Some(250));
}
+ #[tokio::test]
+ async fn test_dynamic_overwrite_ignores_changelog_only_message() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_dynamic_overwrite_changelog_only";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_partitioned_commit(&file_io, table_path);
+ commit
+ .commit(vec![CommitMessage::new(
+ partition_bytes("a"),
+ 0,
+ vec![test_data_file("data-a.parquet", 100)],
+ )])
+ .await
+ .unwrap();
+
+ let mut message = CommitMessage::new(partition_bytes("a"), 0, vec![]);
+ message.new_changelog_files =
vec![test_data_file("changelog-a.parquet", 1)];
+
+ commit.overwrite(vec![message], None).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io,
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 1);
+ assert_eq!(snapshot.commit_kind(), &CommitKind::APPEND);
+ assert_eq!(snapshot.total_record_count(), Some(100));
+ assert_eq!(snapshot.changelog_manifest_list(), None);
+ }
+
#[tokio::test]
async fn test_drop_partitions() {
let file_io = test_file_io();
@@ -1622,6 +1716,26 @@ mod tests {
assert_eq!(snapshot.total_record_count(), Some(350));
}
+ #[tokio::test]
+ async fn test_overwrite_ignores_changelog_files() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_overwrite_changelog_files";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_commit(&file_io, table_path);
+ let mut message = CommitMessage::new(vec![], 0,
vec![test_data_file("data.parquet", 1)]);
+ message.new_changelog_files = vec![test_data_file("changelog.parquet",
1)];
+
+ commit.overwrite(vec![message], None).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io,
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+ assert_eq!(snapshot.total_record_count(), Some(1));
+ assert_eq!(snapshot.changelog_record_count(), None);
+ assert_eq!(snapshot.changelog_manifest_list(), None);
+ }
+
#[tokio::test]
async fn test_delete_conflict_rejects_missing_file() {
let file_io = test_file_io();
diff --git a/crates/paimon/src/table/table_write.rs
b/crates/paimon/src/table/table_write.rs
index 49c0019..f34c124 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -21,10 +21,10 @@
//! and [pypaimon
FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py)
use crate::arrow::build_target_arrow_schema;
-use crate::spec::DataFileMeta;
use crate::spec::PartitionComputer;
use crate::spec::{
- BinaryRow, CoreOptions, DataType, MergeEngine, EMPTY_SERIALIZED_ROW,
POSTPONE_BUCKET,
+ BinaryRow, ChangelogProducer, CoreOptions, DataType, MergeEngine,
EMPTY_SERIALIZED_ROW,
+ POSTPONE_BUCKET,
};
use crate::table::blob_file_writer::AppendBlobFileWriter;
use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey};
@@ -37,6 +37,7 @@ use crate::table::data_file_writer::DataFileWriter;
use crate::table::kv_file_writer::{KeyValueFileWriter, KeyValueWriteConfig};
use crate::table::partition_filter::PartitionFilter;
use crate::table::postpone_file_writer::{PostponeFileWriter,
PostponeWriteConfig};
+use crate::table::prepared_files::PreparedFiles;
use crate::table::{SnapshotManager, Table, TableScan};
use crate::Result;
use arrow_array::RecordBatch;
@@ -61,12 +62,12 @@ impl FileWriter {
}
}
- async fn prepare_commit(mut self) -> Result<Vec<DataFileMeta>> {
+ async fn prepare_commit(mut self) -> Result<PreparedFiles> {
match self {
- FileWriter::Append(ref mut w) => w.prepare_commit().await,
- FileWriter::AppendBlob(ref mut w) => w.prepare_commit().await,
+ FileWriter::Append(ref mut w) =>
w.prepare_commit().await.map(PreparedFiles::data),
+ FileWriter::AppendBlob(ref mut w) =>
w.prepare_commit().await.map(PreparedFiles::data),
FileWriter::KeyValue(ref mut w) => w.prepare_commit().await,
- FileWriter::Postpone(ref mut w) => w.prepare_commit().await,
+ FileWriter::Postpone(ref mut w) =>
w.prepare_commit().await.map(PreparedFiles::data),
}
}
}
@@ -96,6 +97,10 @@ pub struct TableWrite {
primary_key_types: Vec<DataType>,
sequence_field_indices: Vec<usize>,
merge_engine: MergeEngine,
+ changelog_producer: ChangelogProducer,
+ changelog_file_prefix: String,
+ changelog_file_format: String,
+ changelog_file_compression: String,
partition_seq_cache: HashMap<Vec<u8>, HashMap<i32, i64>>,
commit_user: String,
/// Bucket assignment strategy (fixed, dynamic, or cross-partition).
@@ -138,6 +143,7 @@ impl TableWrite {
let total_buckets = core_options.bucket();
let has_primary_keys = !schema.primary_keys().is_empty();
let is_dynamic_bucket = has_primary_keys && total_buckets == -1;
+ let changelog_producer = core_options.try_changelog_producer()?;
let is_dynamic_cross_partition =
is_dynamic_bucket && !schema.partition_keys().is_empty() && {
@@ -160,16 +166,6 @@ impl TableWrite {
),
});
}
- if has_primary_keys
- && total_buckets != POSTPONE_BUCKET
- && core_options
- .changelog_producer()
- .eq_ignore_ascii_case("input")
- {
- return Err(crate::Error::Unsupported {
- message: "KeyValueFileWriter does not support
changelog-producer=input".to_string(),
- });
- }
if !has_primary_keys && total_buckets != -1 &&
core_options.bucket_key().is_none() {
return Err(crate::Error::Unsupported {
@@ -181,6 +177,9 @@ impl TableWrite {
let file_compression = core_options.file_compression().to_string();
let file_compression_zstd_level =
core_options.file_compression_zstd_level();
let file_format = core_options.file_format().to_string();
+ let changelog_file_prefix =
core_options.changelog_file_prefix().to_string();
+ let changelog_file_format =
core_options.changelog_file_format().to_string();
+ let changelog_file_compression =
core_options.changelog_file_compression().to_string();
let write_buffer_size = core_options.write_parquet_buffer_size();
let partition_keys: Vec<String> = schema.partition_keys().to_vec();
let fields = schema.fields();
@@ -295,6 +294,10 @@ impl TableWrite {
primary_key_types,
sequence_field_indices,
merge_engine,
+ changelog_producer,
+ changelog_file_prefix,
+ changelog_file_format,
+ changelog_file_compression,
partition_seq_cache: HashMap::new(),
commit_user,
bucket_assigner,
@@ -543,8 +546,12 @@ impl TableWrite {
for (partition_bytes, bucket, files) in results {
let key = (partition_bytes.clone(), bucket);
let index_files =
index_files_by_key.remove(&key).unwrap_or_default();
- if !files.is_empty() || !index_files.is_empty() {
- let mut msg = CommitMessage::new(partition_bytes, bucket,
files);
+ if !files.data_files.is_empty()
+ || !files.changelog_files.is_empty()
+ || !index_files.is_empty()
+ {
+ let mut msg = CommitMessage::new(partition_bytes, bucket,
files.data_files);
+ msg.new_changelog_files = files.changelog_files;
msg.new_index_files = index_files;
messages.push(msg);
}
@@ -682,6 +689,11 @@ impl TableWrite {
file_compression_zstd_level: self.file_compression_zstd_level,
write_buffer_size: self.write_buffer_size,
file_format: self.file_format.clone(),
+ input_changelog: self.changelog_producer ==
ChangelogProducer::Input
+ && !self.is_overwrite,
+ changelog_file_prefix: self.changelog_file_prefix.clone(),
+ changelog_file_compression:
self.changelog_file_compression.clone(),
+ changelog_file_format: self.changelog_file_format.clone(),
primary_key_indices: self.primary_key_indices.clone(),
primary_key_types: self.primary_key_types.clone(),
sequence_field_indices: self.sequence_field_indices.clone(),
@@ -697,15 +709,18 @@ impl TableWrite {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::arrow::format::create_format_reader;
use crate::catalog::Identifier;
use crate::io::{FileIO, FileIOBuilder};
use crate::spec::{
- BinaryRowBuilder, BlobType, DataType, DecimalType, IntType,
LocalZonedTimestampType,
- Schema, TableSchema, TimestampType, VarCharType,
+ bucket_dir_name, BigIntType, BinaryRowBuilder, BlobType, DataField,
DataType, DecimalType,
+ FileKind, IndexManifest, IntType, LocalZonedTimestampType, Manifest,
ManifestList, Schema,
+ TableSchema, TimestampType, TinyIntType, VarCharType,
SEQUENCE_NUMBER_FIELD_ID,
+ SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_ID, VALUE_KIND_FIELD_NAME,
};
use crate::table::{SnapshotManager, TableCommit};
- use arrow_array::Int32Array;
use arrow_array::RecordBatchReader as _;
+ use arrow_array::{Int32Array, Int64Array, Int8Array};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
TimeUnit,
};
@@ -790,6 +805,147 @@ mod tests {
.unwrap()
}
+ fn make_batch_with_value_kind(
+ ids: Vec<i32>,
+ values: Vec<i32>,
+ value_kinds: Vec<i8>,
+ ) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ArrowField::new(
+ crate::spec::VALUE_KIND_FIELD_NAME,
+ ArrowDataType::Int8,
+ false,
+ ),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ Arc::new(Int8Array::from(value_kinds)),
+ ],
+ )
+ .unwrap()
+ }
+
+ fn make_partitioned_batch_with_value_kind(
+ pts: Vec<&str>,
+ ids: Vec<i32>,
+ values: Vec<i32>,
+ value_kinds: Vec<i8>,
+ ) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("pt", ArrowDataType::Utf8, false),
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ArrowField::new(
+ crate::spec::VALUE_KIND_FIELD_NAME,
+ ArrowDataType::Int8,
+ false,
+ ),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arrow_array::StringArray::from(pts)),
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ Arc::new(Int8Array::from(value_kinds)),
+ ],
+ )
+ .unwrap()
+ }
+
+ fn physical_key_value_fields() -> Vec<DataField> {
+ vec![
+ DataField::new(
+ SEQUENCE_NUMBER_FIELD_ID,
+ SEQUENCE_NUMBER_FIELD_NAME.to_string(),
+ DataType::BigInt(BigIntType::new()),
+ ),
+ DataField::new(
+ VALUE_KIND_FIELD_ID,
+ VALUE_KIND_FIELD_NAME.to_string(),
+ DataType::TinyInt(TinyIntType::new()),
+ ),
+ DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+ DataField::new(1, "value".to_string(),
DataType::Int(IntType::new())),
+ ]
+ }
+
+ async fn read_physical_key_value_batches(
+ file_io: &FileIO,
+ file_path: &str,
+ file_size: i64,
+ ) -> Vec<RecordBatch> {
+ let format_reader = create_format_reader(file_path, false).unwrap();
+ let input = file_io.new_input(file_path).unwrap();
+ let file_reader = input.reader().await.unwrap();
+ let read_fields = physical_key_value_fields();
+ let stream = format_reader
+ .read_batch_stream(
+ Box::new(file_reader),
+ file_size as u64,
+ &read_fields,
+ None,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+ futures::TryStreamExt::try_collect(stream).await.unwrap()
+ }
+
+ fn collect_i32(batches: &[RecordBatch], column: usize) -> Vec<i32> {
+ batches
+ .iter()
+ .flat_map(|batch| {
+ batch
+ .column(column)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect()
+ }
+
+ fn collect_i64(batches: &[RecordBatch], column: usize) -> Vec<i64> {
+ batches
+ .iter()
+ .flat_map(|batch| {
+ batch
+ .column(column)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect()
+ }
+
+ fn collect_i8(batches: &[RecordBatch], column: usize) -> Vec<i8> {
+ batches
+ .iter()
+ .flat_map(|batch| {
+ batch
+ .column(column)
+ .as_any()
+ .downcast_ref::<Int8Array>()
+ .unwrap()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect()
+ }
+
fn make_partitioned_batch(pts: Vec<&str>, ids: Vec<i32>) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("pt", ArrowDataType::Utf8, false),
@@ -1396,6 +1552,320 @@ mod tests {
)
}
+ fn pk_changelog_schema(options: &[(&str, &str)]) -> TableSchema {
+ let mut builder = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("bucket", "1");
+ for (key, value) in options {
+ builder = builder.option(*key, *value);
+ }
+ TableSchema::new(0, &builder.build().unwrap())
+ }
+
+ fn ordinary_dynamic_pk_changelog_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("pt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .partition_keys(["pt"])
+ .primary_key(["pt", "id"])
+ .option("changelog-producer", "input")
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ #[tokio::test]
+ async fn test_input_changelog_writes_raw_rows_separately_from_data_rows() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_input_changelog_duplicate_pk";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_input_changelog"),
+ table_path.to_string(),
+ pk_changelog_schema(&[
+ ("changelog-producer", "input"),
+ ("changelog-file.prefix", "custom-changelog-"),
+ ]),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+ table_write
+ .write_arrow_batch(&make_batch(vec![1, 1], vec![10, 20]))
+ .await
+ .unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ assert_eq!(messages[0].new_files.len(), 1);
+ assert_eq!(messages[0].new_files[0].row_count, 1);
+ assert_eq!(messages[0].new_changelog_files.len(), 1);
+ assert_eq!(messages[0].new_changelog_files[0].row_count, 2);
+ assert!(messages[0].new_files[0].file_name.starts_with("data-"));
+ assert!(messages[0].new_changelog_files[0]
+ .file_name
+ .starts_with("custom-changelog-"));
+
+ let bucket_dir = bucket_dir_name(messages[0].bucket);
+ let data_file = &messages[0].new_files[0];
+ let data_file_path = format!("{table_path}/{bucket_dir}/{}",
data_file.file_name);
+ let data_batches =
+ read_physical_key_value_batches(&file_io, &data_file_path,
data_file.file_size).await;
+ assert_eq!(collect_i64(&data_batches, 0), vec![1]);
+ assert_eq!(collect_i8(&data_batches, 1), vec![0]);
+ assert_eq!(collect_i32(&data_batches, 2), vec![1]);
+ assert_eq!(collect_i32(&data_batches, 3), vec![20]);
+
+ let changelog_file = &messages[0].new_changelog_files[0];
+ let changelog_file_path = format!("{table_path}/{bucket_dir}/{}",
changelog_file.file_name);
+ let changelog_batches = read_physical_key_value_batches(
+ &file_io,
+ &changelog_file_path,
+ changelog_file.file_size,
+ )
+ .await;
+ assert_eq!(collect_i64(&changelog_batches, 0), vec![0, 1]);
+ assert_eq!(collect_i8(&changelog_batches, 1), vec![0, 0]);
+ assert_eq!(collect_i32(&changelog_batches, 2), vec![1, 1]);
+ assert_eq!(collect_i32(&changelog_batches, 3), vec![10, 20]);
+ }
+
+ #[tokio::test]
+ async fn test_input_changelog_metadata_counts_retract_rows() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_input_changelog_retract_rows";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "test_input_changelog"),
+ table_path.to_string(),
+ pk_changelog_schema(&[("changelog-producer", "input")]),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+ table_write
+ .write_arrow_batch(&make_batch_with_value_kind(
+ vec![1, 2, 3],
+ vec![10, 20, 30],
+ vec![0, 1, 3],
+ ))
+ .await
+ .unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages[0].new_files[0].delete_row_count, Some(2));
+ assert_eq!(messages[0].new_changelog_files[0].delete_row_count,
Some(2));
+ }
+
+ #[tokio::test]
+ async fn test_input_changelog_rejects_invalid_value_kind() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_input_changelog_invalid_value_kind";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "test_input_changelog"),
+ table_path.to_string(),
+ pk_changelog_schema(&[("changelog-producer", "input")]),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+ table_write
+ .write_arrow_batch(&make_batch_with_value_kind(vec![1], vec![10],
vec![4]))
+ .await
+ .unwrap();
+
+ let err = table_write.prepare_commit().await.unwrap_err();
+ assert!(
+ matches!(err, crate::Error::DataInvalid { message, .. } if
message.contains("Invalid RowKind value"))
+ );
+ }
+
+ #[tokio::test]
+ async fn test_input_changelog_commit_writes_changelog_manifest_metadata() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_input_changelog_commit";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_input_changelog"),
+ table_path.to_string(),
+ pk_changelog_schema(&[("changelog-producer", "input")]),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+ table_write
+ .write_arrow_batch(&make_batch(vec![1, 1], vec![10, 20]))
+ .await
+ .unwrap();
+ let messages = table_write.prepare_commit().await.unwrap();
+ let data_file_name = messages[0].new_files[0].file_name.clone();
+ let changelog_file_name =
messages[0].new_changelog_files[0].file_name.clone();
+
+ let commit = TableCommit::new(table, "test-user".to_string());
+ commit.commit(messages).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.total_record_count(), Some(1));
+ assert_eq!(snapshot.delta_record_count(), Some(1));
+ assert_eq!(snapshot.changelog_record_count(), Some(2));
+
+ let manifest_dir = format!("{table_path}/manifest");
+ let delta_metas = ManifestList::read(
+ &file_io,
+ &format!("{manifest_dir}/{}", snapshot.delta_manifest_list()),
+ )
+ .await
+ .unwrap();
+ let delta_entries = Manifest::read(
+ &file_io,
+ &format!("{manifest_dir}/{}", delta_metas[0].file_name()),
+ )
+ .await
+ .unwrap();
+ assert_eq!(delta_entries.len(), 1);
+ assert_eq!(*delta_entries[0].kind(), FileKind::Add);
+ assert_eq!(delta_entries[0].file().file_name, data_file_name);
+
+ let changelog_list = snapshot
+ .changelog_manifest_list()
+ .expect("changelog manifest list");
+ let changelog_metas =
+ ManifestList::read(&file_io,
&format!("{manifest_dir}/{changelog_list}"))
+ .await
+ .unwrap();
+ let changelog_entries = Manifest::read(
+ &file_io,
+ &format!("{manifest_dir}/{}", changelog_metas[0].file_name()),
+ )
+ .await
+ .unwrap();
+ assert_eq!(changelog_entries.len(), 1);
+ assert_eq!(*changelog_entries[0].kind(), FileKind::Add);
+ assert_eq!(changelog_entries[0].file().file_name, changelog_file_name);
+ assert_eq!(changelog_entries[0].file().row_count, 2);
+ }
+
+ #[tokio::test]
+ async fn
test_input_changelog_dynamic_bucket_commits_data_changelog_and_index() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_input_changelog_dynamic_bucket_commit";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_input_changelog_dynamic_bucket"),
+ table_path.to_string(),
+ ordinary_dynamic_pk_changelog_schema(),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+ assert!(matches!(
+ table_write.bucket_assigner,
+ BucketAssignerEnum::Dynamic(_)
+ ));
+ table_write
+ .write_arrow_batch(&make_partitioned_batch_with_value_kind(
+ vec!["a", "a"],
+ vec![1, 2],
+ vec![10, 20],
+ vec![0, 3],
+ ))
+ .await
+ .unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ assert_eq!(messages[0].new_files.len(), 1);
+ assert_eq!(messages[0].new_files[0].row_count, 2);
+ assert_eq!(messages[0].new_files[0].delete_row_count, Some(1));
+ assert_eq!(messages[0].new_changelog_files.len(), 1);
+ assert_eq!(messages[0].new_changelog_files[0].row_count, 2);
+ assert_eq!(messages[0].new_changelog_files[0].delete_row_count,
Some(1));
+ assert_eq!(messages[0].new_index_files.len(), 1);
+ assert_eq!(messages[0].new_index_files[0].index_type, "HASH");
+ assert_eq!(messages[0].new_index_files[0].row_count, 2);
+
+ let index_file_name = messages[0].new_index_files[0].file_name.clone();
+
+ let commit = TableCommit::new(table, "test-user".to_string());
+ commit.commit(messages).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.total_record_count(), Some(2));
+ assert_eq!(snapshot.delta_record_count(), Some(2));
+ assert_eq!(snapshot.changelog_record_count(), Some(2));
+
+ let manifest_dir = format!("{table_path}/manifest");
+ let changelog_list = snapshot
+ .changelog_manifest_list()
+ .expect("changelog manifest list");
+ let changelog_metas =
+ ManifestList::read(&file_io,
&format!("{manifest_dir}/{changelog_list}"))
+ .await
+ .unwrap();
+ let changelog_partition_stats = changelog_metas[0].partition_stats();
+ assert!(!changelog_partition_stats.min_values().is_empty());
+ assert!(!changelog_partition_stats.max_values().is_empty());
+ assert_eq!(
+ changelog_partition_stats.null_counts().as_slice(),
+ &[Some(0)]
+ );
+
+ let index_manifest = snapshot.index_manifest().expect("index
manifest");
+ let index_entries =
+ IndexManifest::read(&file_io,
&format!("{manifest_dir}/{index_manifest}"))
+ .await
+ .unwrap();
+ assert_eq!(index_entries.len(), 1);
+ assert_eq!(index_entries[0].kind, FileKind::Add);
+ assert_eq!(index_entries[0].index_file.file_name, index_file_name);
+ assert_eq!(index_entries[0].index_file.index_type, "HASH");
+ assert_eq!(index_entries[0].index_file.row_count, 2);
+ }
+
+ #[tokio::test]
+ async fn test_input_changelog_overwrite_does_not_write_changelog_files() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_input_changelog_overwrite";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "test_input_changelog"),
+ table_path.to_string(),
+ pk_changelog_schema(&[("changelog-producer", "input")]),
+ None,
+ );
+
+ let mut table_write = TableWrite::new(&table, "test-user".to_string())
+ .unwrap()
+ .with_overwrite();
+ table_write
+ .write_arrow_batch(&make_batch(vec![1], vec![10]))
+ .await
+ .unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ assert_eq!(messages[0].new_files.len(), 1);
+ assert!(messages[0].new_changelog_files.is_empty());
+ }
+
#[tokio::test]
async fn test_pk_write_and_commit() {
let file_io = test_file_io();