This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f99f5b7  test: add full types boundary consistency coverage (#351)
f99f5b7 is described below

commit f99f5b7c8feec290136f3e0c5379a9b706326b82
Author: QuakeWang <[email protected]>
AuthorDate: Tue Jun 2 20:41:54 2026 +0800

    test: add full types boundary consistency coverage (#351)
---
 crates/integration_tests/tests/read_tables.rs | 420 +++++++++++++++++++++++++-
 dev/spark/provision.py                        | 116 +++++++
 2 files changed, 534 insertions(+), 2 deletions(-)

diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index b89268c..fdf213a 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -18,8 +18,8 @@
 //! Integration tests for reading Paimon tables provisioned by Spark.
 
 use arrow_array::{
-    Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray, 
RecordBatch,
-    StringArray, StructArray,
+    Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray, 
PrimitiveArray,
+    RecordBatch, StringArray, StructArray,
 };
 use futures::TryStreamExt;
 use paimon::api::ConfigResponse;
@@ -2638,3 +2638,419 @@ async fn test_read_full_types_table() {
     assert_eq!(r.17, vec![("d".into(), 40), ("e".into(), 50)]); // map
     assert_eq!(r.18, ("carol".into(), 300)); // struct
 }
+
+#[tokio::test]
+async fn test_read_full_types_boundary_table() {
+    use arrow_array::{
+        BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, 
Float64Array,
+        Int16Array, Int64Array, Int8Array, ListArray, MapArray, StructArray,
+        TimestampMicrosecondArray,
+    };
+
+    #[derive(Debug, PartialEq)]
+    struct BoundaryRow {
+        id: i32,
+        col_boolean: Option<bool>,
+        col_tinyint: Option<i8>,
+        col_smallint: Option<i16>,
+        col_int: Option<i32>,
+        col_bigint: Option<i64>,
+        col_float: Option<f32>,
+        col_double: Option<f64>,
+        col_decimal: Option<i128>,
+        col_decimal5: Option<i128>,
+        col_decimal38: Option<i128>,
+        col_string: Option<String>,
+        col_binary: Option<Vec<u8>>,
+        col_date: Option<i32>,
+        col_timestamp: Option<i64>,
+        col_timestamp_ltz: Option<i64>,
+        col_array: Option<Vec<Option<i32>>>,
+        col_map: Option<Vec<(String, Option<i32>)>>,
+        col_struct: Option<(Option<String>, Option<i32>)>,
+    }
+
+    fn primitive_value<T: ArrowPrimitiveType>(
+        array: &PrimitiveArray<T>,
+        row: usize,
+    ) -> Option<T::Native> {
+        (!array.is_null(row)).then(|| array.value(row))
+    }
+
+    fn bool_value(array: &BooleanArray, row: usize) -> Option<bool> {
+        (!array.is_null(row)).then(|| array.value(row))
+    }
+
+    fn string_value(array: &StringArray, row: usize) -> Option<String> {
+        (!array.is_null(row)).then(|| array.value(row).to_string())
+    }
+
+    fn binary_value(array: &BinaryArray, row: usize) -> Option<Vec<u8>> {
+        (!array.is_null(row)).then(|| array.value(row).to_vec())
+    }
+
+    fn list_i32_value(array: &ListArray, row: usize) -> 
Option<Vec<Option<i32>>> {
+        if array.is_null(row) {
+            return None;
+        }
+        let values = array.value(row);
+        let values = values
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("list element as Int32Array");
+        Some(
+            (0..values.len())
+                .map(|i| (!values.is_null(i)).then(|| values.value(i)))
+                .collect(),
+        )
+    }
+
+    fn map_string_i32_value(array: &MapArray, row: usize) -> 
Option<Vec<(String, Option<i32>)>> {
+        if array.is_null(row) {
+            return None;
+        }
+        let entries = array.value(row);
+        let entries = entries
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .expect("map entries as StructArray");
+        let keys = entries
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("map keys");
+        let values = entries
+            .column(1)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("map values");
+        let mut result: Vec<(String, Option<i32>)> = (0..keys.len())
+            .map(|i| {
+                (
+                    keys.value(i).to_string(),
+                    (!values.is_null(i)).then(|| values.value(i)),
+                )
+            })
+            .collect();
+        result.sort_by(|left, right| left.0.cmp(&right.0));
+        Some(result)
+    }
+
+    fn struct_string_i32_value(
+        array: &StructArray,
+        row: usize,
+    ) -> Option<(Option<String>, Option<i32>)> {
+        if array.is_null(row) {
+            return None;
+        }
+        let names = array
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("struct name");
+        let values = array
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("struct value");
+        Some((
+            (!names.is_null(row)).then(|| names.value(row).to_string()),
+            (!values.is_null(row)).then(|| values.value(row)),
+        ))
+    }
+
+    let (plan, batches) = 
scan_and_read_with_fs_catalog("full_types_boundary_table", None).await;
+    let formats: HashSet<&str> = plan
+        .splits()
+        .iter()
+        .flat_map(|split| split.data_files())
+        .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
+        .collect();
+    assert_eq!(
+        formats,
+        HashSet::from(["avro", "orc", "parquet"]),
+        "full_types_boundary_table should scan all provisioned file formats"
+    );
+
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    assert_eq!(
+        total_rows, 6,
+        "full_types_boundary_table should have 6 rows"
+    );
+
+    let mut rows = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let col_boolean = batch
+            .column_by_name("col_boolean")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .unwrap();
+        let col_tinyint = batch
+            .column_by_name("col_tinyint")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int8Array>()
+            .unwrap();
+        let col_smallint = batch
+            .column_by_name("col_smallint")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int16Array>()
+            .unwrap();
+        let col_int = batch
+            .column_by_name("col_int")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let col_bigint = batch
+            .column_by_name("col_bigint")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        let col_float = batch
+            .column_by_name("col_float")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Float32Array>()
+            .unwrap();
+        let col_double = batch
+            .column_by_name("col_double")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap();
+        let col_decimal = batch
+            .column_by_name("col_decimal")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        let col_decimal5 = batch
+            .column_by_name("col_decimal5")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        let col_decimal38 = batch
+            .column_by_name("col_decimal38")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        let col_string = batch
+            .column_by_name("col_string")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let col_binary = batch
+            .column_by_name("col_binary")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap();
+        let col_date = batch
+            .column_by_name("col_date")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Date32Array>()
+            .unwrap();
+        let col_timestamp = batch
+            .column_by_name("col_timestamp")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<TimestampMicrosecondArray>()
+            .unwrap();
+        let col_timestamp_ltz = batch
+            .column_by_name("col_timestamp_ltz")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<TimestampMicrosecondArray>()
+            .unwrap();
+        let col_array = batch
+            .column_by_name("col_array")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .unwrap();
+        let col_map = batch
+            .column_by_name("col_map")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<MapArray>()
+            .unwrap();
+        let col_struct = batch
+            .column_by_name("col_struct")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+
+        for i in 0..batch.num_rows() {
+            rows.push(BoundaryRow {
+                id: id.value(i),
+                col_boolean: bool_value(col_boolean, i),
+                col_tinyint: primitive_value(col_tinyint, i),
+                col_smallint: primitive_value(col_smallint, i),
+                col_int: primitive_value(col_int, i),
+                col_bigint: primitive_value(col_bigint, i),
+                col_float: primitive_value(col_float, i),
+                col_double: primitive_value(col_double, i),
+                col_decimal: primitive_value(col_decimal, i),
+                col_decimal5: primitive_value(col_decimal5, i),
+                col_decimal38: primitive_value(col_decimal38, i),
+                col_string: string_value(col_string, i),
+                col_binary: binary_value(col_binary, i),
+                col_date: primitive_value(col_date, i),
+                col_timestamp: primitive_value(col_timestamp, i),
+                col_timestamp_ltz: primitive_value(col_timestamp_ltz, i),
+                col_array: list_i32_value(col_array, i),
+                col_map: map_string_i32_value(col_map, i),
+                col_struct: struct_string_i32_value(col_struct, i),
+            });
+        }
+    }
+    rows.sort_by_key(|row| row.id);
+
+    assert_eq!(
+        rows,
+        vec![
+            BoundaryRow {
+                id: 1,
+                col_boolean: Some(false),
+                col_tinyint: Some(i8::MIN),
+                col_smallint: Some(i16::MIN),
+                col_int: Some(i32::MIN),
+                col_bigint: Some(i64::MIN),
+                col_float: Some(-0.5),
+                col_double: Some(-1.25),
+                col_decimal: Some(-9_999_999_999),
+                col_decimal5: Some(-99999),
+                col_decimal38: 
Some(-99_999_999_999_999_999_999_999_999_999_999_999_999),
+                col_string: Some(String::new()),
+                col_binary: Some(Vec::new()),
+                col_date: Some(-1),
+                col_timestamp: Some(1),
+                col_timestamp_ltz: Some(1),
+                col_array: Some(vec![None, Some(i32::MIN), Some(0)]),
+                col_map: Some(vec![
+                    ("negative".into(), Some(i32::MIN)),
+                    ("zero".into(), None)
+                ]),
+                col_struct: Some((None, Some(-1))),
+            },
+            BoundaryRow {
+                id: 2,
+                col_boolean: None,
+                col_tinyint: None,
+                col_smallint: None,
+                col_int: None,
+                col_bigint: None,
+                col_float: None,
+                col_double: None,
+                col_decimal: None,
+                col_decimal5: None,
+                col_decimal38: None,
+                col_string: None,
+                col_binary: None,
+                col_date: None,
+                col_timestamp: None,
+                col_timestamp_ltz: None,
+                col_array: None,
+                col_map: None,
+                col_struct: None,
+            },
+            BoundaryRow {
+                id: 3,
+                col_boolean: Some(true),
+                col_tinyint: Some(i8::MAX),
+                col_smallint: Some(i16::MAX),
+                col_int: Some(i32::MAX),
+                col_bigint: Some(i64::MAX),
+                col_float: Some(0.25),
+                col_double: Some(0.5),
+                col_decimal: Some(9_999_999_999),
+                col_decimal5: Some(99999),
+                col_decimal38: 
Some(99_999_999_999_999_999_999_999_999_999_999_999_999),
+                col_string: Some("orc-boundary".into()),
+                col_binary: Some(vec![0x00, 0xFF]),
+                col_date: Some(0),
+                col_timestamp: Some(0),
+                col_timestamp_ltz: Some(0),
+                col_array: Some(vec![]),
+                col_map: Some(vec![]),
+                col_struct: Some((Some("orc".into()), None)),
+            },
+            BoundaryRow {
+                id: 4,
+                col_boolean: None,
+                col_tinyint: None,
+                col_smallint: None,
+                col_int: None,
+                col_bigint: None,
+                col_float: None,
+                col_double: None,
+                col_decimal: None,
+                col_decimal5: None,
+                col_decimal38: None,
+                col_string: None,
+                col_binary: None,
+                col_date: None,
+                col_timestamp: None,
+                col_timestamp_ltz: None,
+                col_array: None,
+                col_map: None,
+                col_struct: None,
+            },
+            BoundaryRow {
+                id: 5,
+                col_boolean: Some(false),
+                col_tinyint: Some(0),
+                col_smallint: Some(0),
+                col_int: Some(0),
+                col_bigint: Some(0),
+                col_float: Some(0.0),
+                col_double: Some(0.0),
+                col_decimal: Some(0),
+                col_decimal5: Some(0),
+                col_decimal38: Some(0),
+                col_string: Some("avro-boundary".into()),
+                col_binary: Some(vec![0x01, 0x02]),
+                col_date: Some(1),
+                col_timestamp: Some(999_999),
+                col_timestamp_ltz: Some(999_999),
+                col_array: Some(vec![Some(7)]),
+                col_map: Some(vec![("seven".into(), Some(7))]),
+                col_struct: Some((Some("avro".into()), Some(7))),
+            },
+            BoundaryRow {
+                id: 6,
+                col_boolean: None,
+                col_tinyint: None,
+                col_smallint: None,
+                col_int: None,
+                col_bigint: None,
+                col_float: None,
+                col_double: None,
+                col_decimal: None,
+                col_decimal5: None,
+                col_decimal38: None,
+                col_string: None,
+                col_binary: None,
+                col_date: None,
+                col_timestamp: None,
+                col_timestamp_ltz: None,
+                col_array: None,
+                col_map: None,
+                col_struct: None,
+            },
+        ]
+    );
+}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 53bda45..c7c408d 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -822,6 +822,122 @@ def main():
         """
     )
 
+    # ===== Full types boundary table: parquet, orc, avro =====
+    # Each format writes one boundary row and one all-null row for nullable 
fields.
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS full_types_boundary_table (
+            id INT,
+            col_boolean BOOLEAN,
+            col_tinyint TINYINT,
+            col_smallint SMALLINT,
+            col_int INT,
+            col_bigint BIGINT,
+            col_float FLOAT,
+            col_double DOUBLE,
+            col_decimal DECIMAL(10, 2),
+            col_decimal5 DECIMAL(5),
+            col_decimal38 DECIMAL(38, 18),
+            col_string STRING,
+            col_binary BINARY,
+            col_date DATE,
+            col_timestamp TIMESTAMP_NTZ,
+            col_timestamp_ltz TIMESTAMP,
+            col_array ARRAY<INT>,
+            col_map MAP<STRING, INT>,
+            col_struct STRUCT<name: STRING, value: INT>
+        ) USING paimon
+        TBLPROPERTIES (
+            'file.format' = 'parquet'
+        )
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO full_types_boundary_table VALUES
+            (1, false, CAST('-128' AS TINYINT), CAST('-32768' AS SMALLINT),
+             CAST('-2147483648' AS INT), CAST('-9223372036854775808' AS 
BIGINT),
+             CAST(-0.5 AS FLOAT), -1.25, CAST('-99999999.99' AS DECIMAL(10,2)),
+             CAST('-99999' AS DECIMAL(5)),
+             CAST('-99999999999999999999.999999999999999999' AS 
DECIMAL(38,18)),
+             '', X'',
+             DATE '1969-12-31',
+             TIMESTAMP_NTZ '1970-01-01 00:00:00.000001',
+             TIMESTAMP '1970-01-01 00:00:00.000001',
+             array(CAST(NULL AS INT), CAST('-2147483648' AS INT), CAST(0 AS 
INT)),
+             map('negative', CAST('-2147483648' AS INT), 'zero', CAST(NULL AS 
INT)),
+             named_struct('name', CAST(NULL AS STRING), 'value', CAST(-1 AS 
INT))),
+            (2, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS 
SMALLINT),
+             CAST(NULL AS INT), CAST(NULL AS BIGINT),
+             CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS 
DECIMAL(10,2)),
+             CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)),
+             CAST(NULL AS STRING), CAST(NULL AS BINARY),
+             CAST(NULL AS DATE),
+             CAST(NULL AS TIMESTAMP_NTZ),
+             CAST(NULL AS TIMESTAMP),
+             CAST(NULL AS ARRAY<INT>),
+             CAST(NULL AS MAP<STRING, INT>),
+             CAST(NULL AS STRUCT<name: STRING, value: INT>))
+        """
+    )
+    spark.sql("ALTER TABLE full_types_boundary_table SET TBLPROPERTIES 
('file.format' = 'orc')")
+    spark.sql(
+        """
+        INSERT INTO full_types_boundary_table VALUES
+            (3, true, CAST('127' AS TINYINT), CAST('32767' AS SMALLINT),
+             CAST('2147483647' AS INT), CAST('9223372036854775807' AS BIGINT),
+             CAST(0.25 AS FLOAT), 0.5, CAST('99999999.99' AS DECIMAL(10,2)),
+             CAST('99999' AS DECIMAL(5)),
+             CAST('99999999999999999999.999999999999999999' AS DECIMAL(38,18)),
+             'orc-boundary', X'00FF',
+             DATE '1970-01-01',
+             TIMESTAMP_NTZ '1970-01-01 00:00:00',
+             TIMESTAMP '1970-01-01 00:00:00',
+             CAST(array() AS ARRAY<INT>),
+             CAST(map() AS MAP<STRING, INT>),
+             named_struct('name', 'orc', 'value', CAST(NULL AS INT))),
+            (4, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS 
SMALLINT),
+             CAST(NULL AS INT), CAST(NULL AS BIGINT),
+             CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS 
DECIMAL(10,2)),
+             CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)),
+             CAST(NULL AS STRING), CAST(NULL AS BINARY),
+             CAST(NULL AS DATE),
+             CAST(NULL AS TIMESTAMP_NTZ),
+             CAST(NULL AS TIMESTAMP),
+             CAST(NULL AS ARRAY<INT>),
+             CAST(NULL AS MAP<STRING, INT>),
+             CAST(NULL AS STRUCT<name: STRING, value: INT>))
+        """
+    )
+    spark.sql("ALTER TABLE full_types_boundary_table SET TBLPROPERTIES 
('file.format' = 'avro')")
+    spark.sql(
+        """
+        INSERT INTO full_types_boundary_table VALUES
+            (5, false, CAST(0 AS TINYINT), CAST(0 AS SMALLINT),
+             0, 0,
+             CAST(0.0 AS FLOAT), 0.0, CAST(0.00 AS DECIMAL(10,2)),
+             CAST(0 AS DECIMAL(5)), CAST(0 AS DECIMAL(38,18)),
+             'avro-boundary', X'0102',
+             DATE '1970-01-02',
+             TIMESTAMP_NTZ '1970-01-01 00:00:00.999999',
+             TIMESTAMP '1970-01-01 00:00:00.999999',
+             array(CAST(7 AS INT)),
+             map('seven', 7),
+             named_struct('name', 'avro', 'value', 7)),
+            (6, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS 
SMALLINT),
+             CAST(NULL AS INT), CAST(NULL AS BIGINT),
+             CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS 
DECIMAL(10,2)),
+             CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)),
+             CAST(NULL AS STRING), CAST(NULL AS BINARY),
+             CAST(NULL AS DATE),
+             CAST(NULL AS TIMESTAMP_NTZ),
+             CAST(NULL AS TIMESTAMP),
+             CAST(NULL AS ARRAY<INT>),
+             CAST(NULL AS MAP<STRING, INT>),
+             CAST(NULL AS STRUCT<name: STRING, value: INT>))
+        """
+    )
+
 
     # ===== First-Row merge engine PK table =====
     # first-row keeps the earliest inserted row per key; later duplicates are 
ignored.

Reply via email to