This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 29397fc fix: validate first-row changelog producer (#342)
29397fc is described below
commit 29397fc64409d04b60b90b5419eca6663fabaa66
Author: QuakeWang <[email protected]>
AuthorDate: Sun May 24 20:57:20 2026 +0800
fix: validate first-row changelog producer (#342)
---
crates/paimon/src/spec/core_options.rs | 7 ++
crates/paimon/src/spec/schema.rs | 161 ++++++++++++++++++++++++++++++++-
crates/paimon/src/table/table_write.rs | 61 ++++++++++++-
3 files changed, 226 insertions(+), 3 deletions(-)
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index aafdf52..a7adb1d 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -106,6 +106,13 @@ impl ChangelogProducer {
}
}
+pub(crate) fn first_row_supports_changelog_producer(producer:
ChangelogProducer) -> bool {
+ matches!(
+ producer,
+ ChangelogProducer::None | ChangelogProducer::Lookup
+ )
+}
+
/// Format the bucket directory name for a given bucket number.
/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise
`"bucket-{N}"`.
pub fn bucket_dir_name(bucket: i32) -> String {
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index e6d5af9..76d09b4 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::spec::core_options::CoreOptions;
+use crate::spec::core_options::{first_row_supports_changelog_producer,
CoreOptions};
use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
use crate::spec::PartialUpdateConfig;
use serde::{Deserialize, Serialize};
@@ -148,6 +148,7 @@ impl TableSchema {
}
}
+ Schema::validate_first_row_changelog_producer(&new_schema.options)?;
Ok(new_schema)
}
@@ -263,6 +264,7 @@ pub fn escape_single_quotes(text: &str) -> String {
pub const PRIMARY_KEY_OPTION: &str = "primary-key";
/// Option key for partition in table options (same as
[CoreOptions.PARTITION](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)).
pub const PARTITION_OPTION: &str = "partition";
+const MERGE_ENGINE_OPTION: &str = "merge-engine";
/// Schema of a table (logical DDL schema).
///
@@ -291,6 +293,7 @@ impl Schema {
let fields = Self::normalize_fields(&fields, &partition_keys,
&primary_keys)?;
Self::validate_blob_fields(&fields, &partition_keys, &options)?;
PartialUpdateConfig::new(&options).validate_create_mode(!primary_keys.is_empty())?;
+ Self::validate_first_row_changelog_producer(&options)?;
Ok(Self {
fields,
@@ -506,6 +509,40 @@ impl Schema {
Ok(())
}
+ fn validate_first_row_changelog_producer(
+ options: &HashMap<String, String>,
+ ) -> crate::Result<()> {
+ if !options
+ .get(MERGE_ENGINE_OPTION)
+ .is_some_and(|value| value.eq_ignore_ascii_case("first-row"))
+ {
+ return Ok(());
+ }
+
+ let changelog_producer = CoreOptions::new(options)
+ .try_changelog_producer()
+ .map_err(Self::options_error_to_config_invalid)?;
+ if first_row_supports_changelog_producer(changelog_producer) {
+ return Ok(());
+ }
+
+ Err(crate::Error::ConfigInvalid {
+ message: format!(
+ "merge-engine=first-row only supports changelog-producer=none
or lookup, but found changelog-producer={}",
+ changelog_producer.as_str()
+ ),
+ })
+ }
+
+ fn options_error_to_config_invalid(error: crate::Error) -> crate::Error {
+ match error {
+ crate::Error::Unsupported { message } =>
crate::Error::ConfigInvalid { message },
+ other => crate::Error::ConfigInvalid {
+ message: other.to_string(),
+ },
+ }
+ }
+
/// Returns top-level Blob field names for create-time Blob contract
checks.
fn top_level_blob_field_names(fields: &[DataField]) -> Vec<&str> {
fields
@@ -978,6 +1015,128 @@ mod tests {
}
}
+ #[test]
+ fn
test_first_row_schema_validation_accepts_supported_changelog_producers() {
+ for producer in ["none", "lookup"] {
+ Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("merge-engine", "first-row")
+ .option("changelog-producer", producer)
+ .build()
+ .unwrap();
+ }
+ }
+
+ #[test]
+ fn
test_first_row_schema_validation_rejects_incompatible_changelog_producers() {
+ for producer in ["input", "full-compaction"] {
+ let err = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("merge-engine", "first-row")
+ .option("changelog-producer", producer)
+ .build()
+ .unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::ConfigInvalid { ref message }
+ if message.contains("merge-engine=first-row")
+ && message.contains("changelog-producer")
+ && message.contains(producer)),
+ "first-row should reject changelog-producer={producer}, got
{err:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn test_first_row_apply_changes_rejects_incompatible_changelog_producers()
{
+ let table_schema = TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("merge-engine", "first-row")
+ .option("changelog-producer", "lookup")
+ .build()
+ .unwrap(),
+ );
+
+ for producer in ["input", "full-compaction"] {
+ let err = table_schema
+ .apply_changes(vec![crate::spec::SchemaChange::set_option(
+ "changelog-producer".to_string(),
+ producer.to_string(),
+ )])
+ .unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::ConfigInvalid { ref message }
+ if message.contains("merge-engine=first-row")
+ && message.contains("changelog-producer")
+ && message.contains(producer)),
+ "first-row alter should reject changelog-producer={producer},
got {err:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn test_first_row_apply_changes_validates_final_options() {
+ let table_schema = TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("changelog-producer", "input")
+ .build()
+ .unwrap(),
+ );
+
+ let err = table_schema
+ .apply_changes(vec![crate::spec::SchemaChange::set_option(
+ "merge-engine".to_string(),
+ "first-row".to_string(),
+ )])
+ .unwrap_err();
+
+ assert!(
+ matches!(err, crate::Error::ConfigInvalid { ref message }
+ if message.contains("merge-engine=first-row")
+ && message.contains("changelog-producer")
+ && message.contains("input")),
+ "first-row alter should reject incompatible final options, got
{err:?}"
+ );
+
+ let new_schema = table_schema
+ .apply_changes(vec![
+ crate::spec::SchemaChange::set_option(
+ "merge-engine".to_string(),
+ "first-row".to_string(),
+ ),
+ crate::spec::SchemaChange::set_option(
+ "changelog-producer".to_string(),
+ "lookup".to_string(),
+ ),
+ ])
+ .unwrap();
+
+ assert_eq!(
+ new_schema.options().get("merge-engine").map(String::as_str),
+ Some("first-row")
+ );
+ assert_eq!(
+ new_schema
+ .options()
+ .get("changelog-producer")
+ .map(String::as_str),
+ Some("lookup")
+ );
+ }
+
#[test]
fn test_schema_builder_column_row_type() {
let row_type = RowType::new(vec![DataField::new(
diff --git a/crates/paimon/src/table/table_write.rs
b/crates/paimon/src/table/table_write.rs
index f34c124..fea6ff7 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -23,8 +23,8 @@
use crate::arrow::build_target_arrow_schema;
use crate::spec::PartitionComputer;
use crate::spec::{
- BinaryRow, ChangelogProducer, CoreOptions, DataType, MergeEngine,
EMPTY_SERIALIZED_ROW,
- POSTPONE_BUCKET,
+ first_row_supports_changelog_producer, BinaryRow, ChangelogProducer,
CoreOptions, DataType,
+ MergeEngine, EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET,
};
use crate::table::blob_file_writer::AppendBlobFileWriter;
use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey};
@@ -224,6 +224,18 @@ impl TableWrite {
let merge_engine = core_options.merge_engine()?;
+ if merge_engine == MergeEngine::FirstRow
+ && !first_row_supports_changelog_producer(changelog_producer)
+ {
+ return Err(crate::Error::Unsupported {
+ message: format!(
+ "Table '{}' has incompatible table options:
merge-engine=first-row only supports changelog-producer=none or lookup, but
found changelog-producer={}",
+ table.identifier().full_name(),
+ changelog_producer.as_str()
+ ),
+ });
+ }
+
if is_dynamic_cross_partition && merge_engine ==
MergeEngine::PartialUpdate {
return Err(crate::Error::Unsupported {
message:
@@ -1564,6 +1576,21 @@ mod tests {
TableSchema::new(0, &builder.build().unwrap())
}
+ fn loaded_first_row_schema_with_changelog_producer(producer: &str) ->
TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("merge-engine", "first-row")
+ .option("changelog-producer", "lookup")
+ .build()
+ .unwrap();
+ let table_schema = TableSchema::new(0, &schema);
+ let mut value = serde_json::to_value(&table_schema).unwrap();
+ value["options"]["changelog-producer"] =
serde_json::Value::String(producer.to_string());
+ serde_json::from_value(value).unwrap()
+ }
+
fn ordinary_dynamic_pk_changelog_schema() -> TableSchema {
let schema = Schema::builder()
.column("pt", DataType::VarChar(VarCharType::string_type()))
@@ -1577,6 +1604,36 @@ mod tests {
TableSchema::new(0, &schema)
}
+ #[test]
+ fn
test_table_write_rejects_loaded_first_row_with_incompatible_changelog_producer()
{
+ let file_io = test_file_io();
+
+ for producer in ["input", "full-compaction"] {
+ let table = Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_first_row_changelog"),
+ "memory:/test_first_row_changelog".to_string(),
+ loaded_first_row_schema_with_changelog_producer(producer),
+ None,
+ );
+
+ let err = match TableWrite::new(&table, "test-user".to_string()) {
+ Ok(_) => panic!(
+ "first-row should reject changelog-producer={producer}
during write setup"
+ ),
+ Err(err) => err,
+ };
+
+ assert!(
+ matches!(err, crate::Error::Unsupported { ref message }
+ if message.contains("incompatible table options")
+ && message.contains("merge-engine=first-row")
+ && message.contains(producer)),
+ "first-row runtime guard should reject
changelog-producer={producer}, got {err:?}"
+ );
+ }
+ }
+
#[tokio::test]
async fn test_input_changelog_writes_raw_rows_separately_from_data_rows() {
let file_io = test_file_io();