This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a53293c test: add mixed-format schema evolution coverage (#354)
a53293c is described below
commit a53293cbe19f6bf5a9479590ffd9e681c1e837bc
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 8 10:06:18 2026 +0800
test: add mixed-format schema evolution coverage (#354)
---
crates/integration_tests/tests/read_tables.rs | 122 ++++++++++++++++++++++++++
dev/spark/provision.py | 54 ++++++++++++
2 files changed, 176 insertions(+)
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index fdf213a..d0766c3 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -1152,6 +1152,128 @@ async fn test_read_schema_evolution_type_promotion() {
);
}
+fn assert_plan_file_formats(plan: &Plan, expected_formats: &[&str],
table_name: &str) {
+ let formats: HashSet<&str> = plan
+ .splits()
+ .iter()
+ .flat_map(|split| split.data_files())
+ .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
+ .collect();
+ assert_eq!(
+ formats,
+ expected_formats.iter().copied().collect(),
+ "{table_name} should scan the expected data file formats"
+ );
+}
+
+fn assert_plan_has_multiple_schema_ids(plan: &Plan, table_name: &str) {
+ let schema_ids: HashSet<i64> = plan
+ .splits()
+ .iter()
+ .flat_map(|split| split.data_files())
+ .map(|file| file.schema_id)
+ .collect();
+ assert!(
+ schema_ids.len() >= 2,
+ "{table_name} should scan files from multiple schema versions, got
{schema_ids:?}"
+ );
+}
+
+/// Test reading mixed-format files after ALTER TABLE ADD COLUMNS.
+/// Old Parquet files lack the new column; newer ORC/Avro files contain it.
+#[tokio::test]
+async fn test_read_format_schema_evolution_add_column() {
+ let table_name = "format_schema_evolution_add_column";
+ let (plan, batches) = scan_and_read_with_fs_catalog(table_name,
None).await;
+ assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
+ assert_plan_has_multiple_schema_ids(&plan, table_name);
+
+ let mut rows: Vec<(i32, String, Option<i32>)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let name = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("name");
+ let age = batch
+ .column_by_name("age")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("age");
+ for i in 0..batch.num_rows() {
+ rows.push((
+ id.value(i),
+ name.value(i).to_string(),
+ (!age.is_null(i)).then(|| age.value(i)),
+ ));
+ }
+ }
+ rows.sort_by_key(|(id, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".into(), None),
+ (2, "bob".into(), None),
+ (3, "carol".into(), Some(30)),
+ (4, "dave".into(), Some(40)),
+ (5, "eve".into(), Some(50)),
+ (6, "frank".into(), Some(60)),
+ ],
+ "Old Parquet rows should have null age and new ORC/Avro rows should
keep age values"
+ );
+}
+
+/// Test reading mixed-format files after ALTER TABLE ALTER COLUMN TYPE (INT
-> BIGINT).
+/// Old Parquet files have INT; newer ORC/Avro files have BIGINT.
+#[tokio::test]
+async fn test_read_format_schema_evolution_type_promotion() {
+ let table_name = "format_schema_evolution_type_promotion";
+ let (plan, batches) = scan_and_read_with_fs_catalog(table_name,
None).await;
+ assert_plan_file_formats(&plan, &["avro", "orc", "parquet"], table_name);
+ assert_plan_has_multiple_schema_ids(&plan, table_name);
+
+ for batch in &batches {
+ let value_col = batch.column_by_name("value").expect("value column");
+ assert_eq!(
+ value_col.data_type(),
+ &arrow_array::types::Int64Type::DATA_TYPE,
+ "value column should be Int64 after mixed-format type promotion"
+ );
+ }
+
+ let mut rows: Vec<(i32, i64)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let value = batch
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+ .expect("value as Int64Array");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), value.value(i)));
+ }
+ }
+ rows.sort_by_key(|(id, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (1, 100),
+ (2, 200),
+ (3, 3_000_000_000),
+ (4, 4_000_000_000),
+ (5, 5_000_000_000),
+ (6, 6_000_000_000),
+ ],
+ "Old Parquet INT rows should be cast to BIGINT and new ORC/Avro BIGINT
rows should match"
+ );
+}
+
/// Stats pruning should treat a newly added column as all-NULL for old files.
#[tokio::test]
async fn
test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() {
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index c7c408d..1a948c6 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -370,6 +370,60 @@ def main():
"INSERT INTO schema_evolution_type_promotion VALUES (3, 3000000000)"
)
+ # ===== Mixed-format Schema Evolution: Add Column =====
+ # Old Parquet files lack age; new ORC/Avro files contain age.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS format_schema_evolution_add_column (
+ id INT,
+ name STRING
+ ) USING paimon
+ TBLPROPERTIES (
+ 'file.format' = 'parquet'
+ )
+ """
+ )
+ spark.sql(
+ "INSERT INTO format_schema_evolution_add_column VALUES (1, 'alice'),
(2, 'bob')"
+ )
+ spark.sql("ALTER TABLE format_schema_evolution_add_column ADD COLUMNS (age
INT)")
+ spark.sql("ALTER TABLE format_schema_evolution_add_column SET
TBLPROPERTIES ('file.format' = 'orc')")
+ spark.sql(
+ "INSERT INTO format_schema_evolution_add_column VALUES (3, 'carol',
30), (4, 'dave', 40)"
+ )
+ spark.sql("ALTER TABLE format_schema_evolution_add_column SET
TBLPROPERTIES ('file.format' = 'avro')")
+ spark.sql(
+ "INSERT INTO format_schema_evolution_add_column VALUES (5, 'eve', 50),
(6, 'frank', 60)"
+ )
+
+ # ===== Mixed-format Schema Evolution: Type Promotion (INT -> BIGINT) =====
+ # Old Parquet files have value as INT; new ORC/Avro files have value as
BIGINT.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS format_schema_evolution_type_promotion (
+ id INT,
+ value INT
+ ) USING paimon
+ TBLPROPERTIES (
+ 'file.format' = 'parquet'
+ )
+ """
+ )
+ spark.sql(
+ "INSERT INTO format_schema_evolution_type_promotion VALUES (1, 100),
(2, 200)"
+ )
+ spark.sql(
+ "ALTER TABLE format_schema_evolution_type_promotion ALTER COLUMN value
TYPE BIGINT"
+ )
+ spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET
TBLPROPERTIES ('file.format' = 'orc')")
+ spark.sql(
+ "INSERT INTO format_schema_evolution_type_promotion VALUES (3,
3000000000), (4, 4000000000)"
+ )
+ spark.sql("ALTER TABLE format_schema_evolution_type_promotion SET
TBLPROPERTIES ('file.format' = 'avro')")
+ spark.sql(
+ "INSERT INTO format_schema_evolution_type_promotion VALUES (5,
5000000000), (6, 6000000000)"
+ )
+
# ===== Data Evolution + Schema Evolution: Add Column =====
# Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD
COLUMNS.
# Old files lack the new column; MERGE INTO produces partial-column files.