This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a53293c  test: add mixed-format schema evolution coverage (#354)
a53293c is described below

commit a53293cbe19f6bf5a9479590ffd9e681c1e837bc
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 8 10:06:18 2026 +0800

    test: add mixed-format schema evolution coverage (#354)
---
 crates/integration_tests/tests/read_tables.rs | 122 ++++++++++++++++++++++++++
 dev/spark/provision.py                        |  54 ++++++++++++
 2 files changed, 176 insertions(+)

diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index fdf213a..d0766c3 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -1152,6 +1152,128 @@ async fn test_read_schema_evolution_type_promotion() {
     );
 }
 
+fn assert_plan_file_formats(plan: &Plan, expected_formats: &[&str], 
table_name: &str) {
+    let formats: HashSet<&str> = plan
+        .splits()
+        .iter()
+        .flat_map(|split| split.data_files())
+        .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
+        .collect();
+    assert_eq!(
+        formats,
+        expected_formats.iter().copied().collect(),
+        "{table_name} should scan the expected data file formats"
+    );
+}
+
+fn assert_plan_has_multiple_schema_ids(plan: &Plan, table_name: &str) {
+    let schema_ids: HashSet<i64> = plan
+        .splits()
+        .iter()
+        .flat_map(|split| split.data_files())
+        .map(|file| file.schema_id)
+        .collect();
+    assert!(
+        schema_ids.len() >= 2,
+        "{table_name} should scan files from multiple schema versions, got 
{schema_ids:?}"
+    );
+}
+
+/// Test reading mixed-format files after ALTER TABLE ADD COLUMNS.
+/// Old Parquet files lack the new column; newer ORC/Avro files contain it.
+#[tokio::test]
+async fn test_read_format_schema_evolution_add_column() {
+    let table_name = "format_schema_evolution_add_column";
+    let (plan, batches) = scan_and_read_with_fs_catalog(table_name, 
None).await;
+    assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
+    assert_plan_has_multiple_schema_ids(&plan, table_name);
+
+    let mut rows: Vec<(i32, String, Option<i32>)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let age = batch
+            .column_by_name("age")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("age");
+        for i in 0..batch.num_rows() {
+            rows.push((
+                id.value(i),
+                name.value(i).to_string(),
+                (!age.is_null(i)).then(|| age.value(i)),
+            ));
+        }
+    }
+    rows.sort_by_key(|(id, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".into(), None),
+            (2, "bob".into(), None),
+            (3, "carol".into(), Some(30)),
+            (4, "dave".into(), Some(40)),
+            (5, "eve".into(), Some(50)),
+            (6, "frank".into(), Some(60)),
+        ],
+        "Old Parquet rows should have null age and new ORC/Avro rows should 
keep age values"
+    );
+}
+
+/// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT 
-> BIGINT).
+/// Old Parquet files have INT; newer ORC/Avro files have BIGINT.
+#[tokio::test]
+async fn test_read_format_schema_evolution_type_promotion() {
+    let table_name = "format_schema_evolution_type_promotion";
+    let (plan, batches) = scan_and_read_with_fs_catalog(table_name, 
None).await;
+    assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
+    assert_plan_has_multiple_schema_ids(&plan, table_name);
+
+    for batch in &batches {
+        let value_col = batch.column_by_name("value").expect("value column");
+        assert_eq!(
+            value_col.data_type(),
+            &arrow_array::types::Int64Type::DATA_TYPE,
+            "value column should be Int64 after mixed-format type promotion"
+        );
+    }
+
+    let mut rows: Vec<(i32, i64)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let value = batch
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+            .expect("value as Int64Array");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), value.value(i)));
+        }
+    }
+    rows.sort_by_key(|(id, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (1, 100),
+            (2, 200),
+            (3, 3_000_000_000),
+            (4, 4_000_000_000),
+            (5, 5_000_000_000),
+            (6, 6_000_000_000),
+        ],
+        "Old Parquet INT rows should be cast to BIGINT and new ORC/Avro BIGINT 
rows should match"
+    );
+}
+
 /// Stats pruning should treat a newly added column as all-NULL for old files.
 #[tokio::test]
 async fn 
test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() {
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index c7c408d..1a948c6 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -370,6 +370,60 @@ def main():
         "INSERT INTO schema_evolution_type_promotion VALUES (3, 3000000000)"
     )
 
+    # ===== Mixed-format Schema Evolution: Add Column =====
+    # Old Parquet files lack age; new ORC/Avro files contain age.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS format_schema_evolution_add_column (
+            id INT,
+            name STRING
+        ) USING paimon
+        TBLPROPERTIES (
+            'file.format' = 'parquet'
+        )
+        """
+    )
+    spark.sql(
+        "INSERT INTO format_schema_evolution_add_column VALUES (1, 'alice'), 
(2, 'bob')"
+    )
+    spark.sql("ALTER TABLE format_schema_evolution_add_column ADD COLUMNS (age 
INT)")
+    spark.sql("ALTER TABLE format_schema_evolution_add_column SET 
TBLPROPERTIES ('file.format' = 'orc')")
+    spark.sql(
+        "INSERT INTO format_schema_evolution_add_column VALUES (3, 'carol', 
30), (4, 'dave', 40)"
+    )
+    spark.sql("ALTER TABLE format_schema_evolution_add_column SET 
TBLPROPERTIES ('file.format' = 'avro')")
+    spark.sql(
+        "INSERT INTO format_schema_evolution_add_column VALUES (5, 'eve', 50), 
(6, 'frank', 60)"
+    )
+
+    # ===== Mixed-format Schema Evolution: Type Promotion (INT -> BIGINT) =====
+    # Old Parquet files have value as INT; new ORC/Avro files have value as 
BIGINT.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS format_schema_evolution_type_promotion (
+            id INT,
+            value INT
+        ) USING paimon
+        TBLPROPERTIES (
+            'file.format' = 'parquet'
+        )
+        """
+    )
+    spark.sql(
+        "INSERT INTO format_schema_evolution_type_promotion VALUES (1, 100), 
(2, 200)"
+    )
+    spark.sql(
+        "ALTER TABLE format_schema_evolution_type_promotion ALTER COLUMN value 
TYPE BIGINT"
+    )
+    spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET 
TBLPROPERTIES ('file.format' = 'orc')")
+    spark.sql(
+        "INSERT INTO format_schema_evolution_type_promotion VALUES (3, 
3000000000), (4, 4000000000)"
+    )
+    spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET 
TBLPROPERTIES ('file.format' = 'avro')")
+    spark.sql(
+        "INSERT INTO format_schema_evolution_type_promotion VALUES (5, 
5000000000), (6, 6000000000)"
+    )
+
     # ===== Data Evolution + Schema Evolution: Add Column =====
     # Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD 
COLUMNS.
     # Old files lack the new column; MERGE INTO produces partial-column files.

Reply via email to