This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 29397fc  fix: validate first-row changelog producer (#342)
29397fc is described below

commit 29397fc64409d04b60b90b5419eca6663fabaa66
Author: QuakeWang <[email protected]>
AuthorDate: Sun May 24 20:57:20 2026 +0800

    fix: validate first-row changelog producer (#342)
---
 crates/paimon/src/spec/core_options.rs |   7 ++
 crates/paimon/src/spec/schema.rs       | 161 ++++++++++++++++++++++++++++++++-
 crates/paimon/src/table/table_write.rs |  61 ++++++++++++-
 3 files changed, 226 insertions(+), 3 deletions(-)

diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index aafdf52..a7adb1d 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -106,6 +106,13 @@ impl ChangelogProducer {
     }
 }
 
+pub(crate) fn first_row_supports_changelog_producer(producer: 
ChangelogProducer) -> bool {
+    matches!(
+        producer,
+        ChangelogProducer::None | ChangelogProducer::Lookup
+    )
+}
+
 /// Format the bucket directory name for a given bucket number.
 /// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise 
`"bucket-{N}"`.
 pub fn bucket_dir_name(bucket: i32) -> String {
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index e6d5af9..76d09b4 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::spec::core_options::CoreOptions;
+use crate::spec::core_options::{first_row_supports_changelog_producer, 
CoreOptions};
 use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
 use crate::spec::PartialUpdateConfig;
 use serde::{Deserialize, Serialize};
@@ -148,6 +148,7 @@ impl TableSchema {
             }
         }
 
+        Schema::validate_first_row_changelog_producer(&new_schema.options)?;
         Ok(new_schema)
     }
 
@@ -263,6 +264,7 @@ pub fn escape_single_quotes(text: &str) -> String {
 pub const PRIMARY_KEY_OPTION: &str = "primary-key";
 /// Option key for partition in table options (same as 
[CoreOptions.PARTITION](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)).
 pub const PARTITION_OPTION: &str = "partition";
+const MERGE_ENGINE_OPTION: &str = "merge-engine";
 
 /// Schema of a table (logical DDL schema).
 ///
@@ -291,6 +293,7 @@ impl Schema {
         let fields = Self::normalize_fields(&fields, &partition_keys, 
&primary_keys)?;
         Self::validate_blob_fields(&fields, &partition_keys, &options)?;
         
PartialUpdateConfig::new(&options).validate_create_mode(!primary_keys.is_empty())?;
+        Self::validate_first_row_changelog_producer(&options)?;
 
         Ok(Self {
             fields,
@@ -506,6 +509,40 @@ impl Schema {
         Ok(())
     }
 
+    fn validate_first_row_changelog_producer(
+        options: &HashMap<String, String>,
+    ) -> crate::Result<()> {
+        if !options
+            .get(MERGE_ENGINE_OPTION)
+            .is_some_and(|value| value.eq_ignore_ascii_case("first-row"))
+        {
+            return Ok(());
+        }
+
+        let changelog_producer = CoreOptions::new(options)
+            .try_changelog_producer()
+            .map_err(Self::options_error_to_config_invalid)?;
+        if first_row_supports_changelog_producer(changelog_producer) {
+            return Ok(());
+        }
+
+        Err(crate::Error::ConfigInvalid {
+            message: format!(
+                "merge-engine=first-row only supports changelog-producer=none 
or lookup, but found changelog-producer={}",
+                changelog_producer.as_str()
+            ),
+        })
+    }
+
+    fn options_error_to_config_invalid(error: crate::Error) -> crate::Error {
+        match error {
+            crate::Error::Unsupported { message } => 
crate::Error::ConfigInvalid { message },
+            other => crate::Error::ConfigInvalid {
+                message: other.to_string(),
+            },
+        }
+    }
+
     /// Returns top-level Blob field names for create-time Blob contract 
checks.
     fn top_level_blob_field_names(fields: &[DataField]) -> Vec<&str> {
         fields
@@ -978,6 +1015,128 @@ mod tests {
         }
     }
 
+    #[test]
+    fn 
test_first_row_schema_validation_accepts_supported_changelog_producers() {
+        for producer in ["none", "lookup"] {
+            Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .primary_key(["id"])
+                .option("merge-engine", "first-row")
+                .option("changelog-producer", producer)
+                .build()
+                .unwrap();
+        }
+    }
+
+    #[test]
+    fn 
test_first_row_schema_validation_rejects_incompatible_changelog_producers() {
+        for producer in ["input", "full-compaction"] {
+            let err = Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .primary_key(["id"])
+                .option("merge-engine", "first-row")
+                .option("changelog-producer", producer)
+                .build()
+                .unwrap_err();
+
+            assert!(
+                matches!(err, crate::Error::ConfigInvalid { ref message }
+                    if message.contains("merge-engine=first-row")
+                        && message.contains("changelog-producer")
+                        && message.contains(producer)),
+                "first-row should reject changelog-producer={producer}, got 
{err:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_first_row_apply_changes_rejects_incompatible_changelog_producers() 
{
+        let table_schema = TableSchema::new(
+            0,
+            &Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .primary_key(["id"])
+                .option("merge-engine", "first-row")
+                .option("changelog-producer", "lookup")
+                .build()
+                .unwrap(),
+        );
+
+        for producer in ["input", "full-compaction"] {
+            let err = table_schema
+                .apply_changes(vec![crate::spec::SchemaChange::set_option(
+                    "changelog-producer".to_string(),
+                    producer.to_string(),
+                )])
+                .unwrap_err();
+
+            assert!(
+                matches!(err, crate::Error::ConfigInvalid { ref message }
+                    if message.contains("merge-engine=first-row")
+                        && message.contains("changelog-producer")
+                        && message.contains(producer)),
+                "first-row alter should reject changelog-producer={producer}, 
got {err:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_first_row_apply_changes_validates_final_options() {
+        let table_schema = TableSchema::new(
+            0,
+            &Schema::builder()
+                .column("id", DataType::Int(IntType::new()))
+                .column("value", DataType::Int(IntType::new()))
+                .primary_key(["id"])
+                .option("changelog-producer", "input")
+                .build()
+                .unwrap(),
+        );
+
+        let err = table_schema
+            .apply_changes(vec![crate::spec::SchemaChange::set_option(
+                "merge-engine".to_string(),
+                "first-row".to_string(),
+            )])
+            .unwrap_err();
+
+        assert!(
+            matches!(err, crate::Error::ConfigInvalid { ref message }
+                if message.contains("merge-engine=first-row")
+                    && message.contains("changelog-producer")
+                    && message.contains("input")),
+            "first-row alter should reject incompatible final options, got 
{err:?}"
+        );
+
+        let new_schema = table_schema
+            .apply_changes(vec![
+                crate::spec::SchemaChange::set_option(
+                    "merge-engine".to_string(),
+                    "first-row".to_string(),
+                ),
+                crate::spec::SchemaChange::set_option(
+                    "changelog-producer".to_string(),
+                    "lookup".to_string(),
+                ),
+            ])
+            .unwrap();
+
+        assert_eq!(
+            new_schema.options().get("merge-engine").map(String::as_str),
+            Some("first-row")
+        );
+        assert_eq!(
+            new_schema
+                .options()
+                .get("changelog-producer")
+                .map(String::as_str),
+            Some("lookup")
+        );
+    }
+
     #[test]
     fn test_schema_builder_column_row_type() {
         let row_type = RowType::new(vec![DataField::new(
diff --git a/crates/paimon/src/table/table_write.rs 
b/crates/paimon/src/table/table_write.rs
index f34c124..fea6ff7 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -23,8 +23,8 @@
 use crate::arrow::build_target_arrow_schema;
 use crate::spec::PartitionComputer;
 use crate::spec::{
-    BinaryRow, ChangelogProducer, CoreOptions, DataType, MergeEngine, 
EMPTY_SERIALIZED_ROW,
-    POSTPONE_BUCKET,
+    first_row_supports_changelog_producer, BinaryRow, ChangelogProducer, 
CoreOptions, DataType,
+    MergeEngine, EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET,
 };
 use crate::table::blob_file_writer::AppendBlobFileWriter;
 use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey};
@@ -224,6 +224,18 @@ impl TableWrite {
 
         let merge_engine = core_options.merge_engine()?;
 
+        if merge_engine == MergeEngine::FirstRow
+            && !first_row_supports_changelog_producer(changelog_producer)
+        {
+            return Err(crate::Error::Unsupported {
+                message: format!(
+                    "Table '{}' has incompatible table options: 
merge-engine=first-row only supports changelog-producer=none or lookup, but 
found changelog-producer={}",
+                    table.identifier().full_name(),
+                    changelog_producer.as_str()
+                ),
+            });
+        }
+
         if is_dynamic_cross_partition && merge_engine == 
MergeEngine::PartialUpdate {
             return Err(crate::Error::Unsupported {
                 message:
@@ -1564,6 +1576,21 @@ mod tests {
         TableSchema::new(0, &builder.build().unwrap())
     }
 
+    fn loaded_first_row_schema_with_changelog_producer(producer: &str) -> 
TableSchema {
+        let schema = Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .primary_key(["id"])
+            .option("merge-engine", "first-row")
+            .option("changelog-producer", "lookup")
+            .build()
+            .unwrap();
+        let table_schema = TableSchema::new(0, &schema);
+        let mut value = serde_json::to_value(&table_schema).unwrap();
+        value["options"]["changelog-producer"] = 
serde_json::Value::String(producer.to_string());
+        serde_json::from_value(value).unwrap()
+    }
+
     fn ordinary_dynamic_pk_changelog_schema() -> TableSchema {
         let schema = Schema::builder()
             .column("pt", DataType::VarChar(VarCharType::string_type()))
@@ -1577,6 +1604,36 @@ mod tests {
         TableSchema::new(0, &schema)
     }
 
+    #[test]
+    fn 
test_table_write_rejects_loaded_first_row_with_incompatible_changelog_producer()
 {
+        let file_io = test_file_io();
+
+        for producer in ["input", "full-compaction"] {
+            let table = Table::new(
+                file_io.clone(),
+                Identifier::new("default", "test_first_row_changelog"),
+                "memory:/test_first_row_changelog".to_string(),
+                loaded_first_row_schema_with_changelog_producer(producer),
+                None,
+            );
+
+            let err = match TableWrite::new(&table, "test-user".to_string()) {
+                Ok(_) => panic!(
+                    "first-row should reject changelog-producer={producer} 
during write setup"
+                ),
+                Err(err) => err,
+            };
+
+            assert!(
+                matches!(err, crate::Error::Unsupported { ref message }
+                    if message.contains("incompatible table options")
+                        && message.contains("merge-engine=first-row")
+                        && message.contains(producer)),
+                "first-row runtime guard should reject 
changelog-producer={producer}, got {err:?}"
+            );
+        }
+    }
+
     #[tokio::test]
     async fn test_input_changelog_writes_raw_rows_separately_from_data_rows() {
         let file_io = test_file_io();

Reply via email to