This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f99f5b7 test: add full types boundary consistency coverage (#351)
f99f5b7 is described below
commit f99f5b7c8feec290136f3e0c5379a9b706326b82
Author: QuakeWang <[email protected]>
AuthorDate: Tue Jun 2 20:41:54 2026 +0800
test: add full types boundary consistency coverage (#351)
---
crates/integration_tests/tests/read_tables.rs | 420 +++++++++++++++++++++++++-
dev/spark/provision.py | 116 +++++++
2 files changed, 534 insertions(+), 2 deletions(-)
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index b89268c..fdf213a 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -18,8 +18,8 @@
//! Integration tests for reading Paimon tables provisioned by Spark.
use arrow_array::{
- Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray,
RecordBatch,
- StringArray, StructArray,
+ Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray,
PrimitiveArray,
+ RecordBatch, StringArray, StructArray,
};
use futures::TryStreamExt;
use paimon::api::ConfigResponse;
@@ -2638,3 +2638,419 @@ async fn test_read_full_types_table() {
assert_eq!(r.17, vec![("d".into(), 40), ("e".into(), 50)]); // map
assert_eq!(r.18, ("carol".into(), 300)); // struct
}
+
+#[tokio::test]
+async fn test_read_full_types_boundary_table() {
+ use arrow_array::{
+ BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
Float64Array,
+ Int16Array, Int64Array, Int8Array, ListArray, MapArray, StructArray,
+ TimestampMicrosecondArray,
+ };
+
+ #[derive(Debug, PartialEq)]
+ struct BoundaryRow {
+ id: i32,
+ col_boolean: Option<bool>,
+ col_tinyint: Option<i8>,
+ col_smallint: Option<i16>,
+ col_int: Option<i32>,
+ col_bigint: Option<i64>,
+ col_float: Option<f32>,
+ col_double: Option<f64>,
+ col_decimal: Option<i128>,
+ col_decimal5: Option<i128>,
+ col_decimal38: Option<i128>,
+ col_string: Option<String>,
+ col_binary: Option<Vec<u8>>,
+ col_date: Option<i32>,
+ col_timestamp: Option<i64>,
+ col_timestamp_ltz: Option<i64>,
+ col_array: Option<Vec<Option<i32>>>,
+ col_map: Option<Vec<(String, Option<i32>)>>,
+ col_struct: Option<(Option<String>, Option<i32>)>,
+ }
+
+ fn primitive_value<T: ArrowPrimitiveType>(
+ array: &PrimitiveArray<T>,
+ row: usize,
+ ) -> Option<T::Native> {
+ (!array.is_null(row)).then(|| array.value(row))
+ }
+
+ fn bool_value(array: &BooleanArray, row: usize) -> Option<bool> {
+ (!array.is_null(row)).then(|| array.value(row))
+ }
+
+ fn string_value(array: &StringArray, row: usize) -> Option<String> {
+ (!array.is_null(row)).then(|| array.value(row).to_string())
+ }
+
+ fn binary_value(array: &BinaryArray, row: usize) -> Option<Vec<u8>> {
+ (!array.is_null(row)).then(|| array.value(row).to_vec())
+ }
+
+ fn list_i32_value(array: &ListArray, row: usize) ->
Option<Vec<Option<i32>>> {
+ if array.is_null(row) {
+ return None;
+ }
+ let values = array.value(row);
+ let values = values
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("list element as Int32Array");
+ Some(
+ (0..values.len())
+ .map(|i| (!values.is_null(i)).then(|| values.value(i)))
+ .collect(),
+ )
+ }
+
+ fn map_string_i32_value(array: &MapArray, row: usize) ->
Option<Vec<(String, Option<i32>)>> {
+ if array.is_null(row) {
+ return None;
+ }
+ let entries = array.value(row);
+ let entries = entries
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .expect("map entries as StructArray");
+ let keys = entries
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("map keys");
+ let values = entries
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("map values");
+ let mut result: Vec<(String, Option<i32>)> = (0..keys.len())
+ .map(|i| {
+ (
+ keys.value(i).to_string(),
+ (!values.is_null(i)).then(|| values.value(i)),
+ )
+ })
+ .collect();
+ result.sort_by(|left, right| left.0.cmp(&right.0));
+ Some(result)
+ }
+
+ fn struct_string_i32_value(
+ array: &StructArray,
+ row: usize,
+ ) -> Option<(Option<String>, Option<i32>)> {
+ if array.is_null(row) {
+ return None;
+ }
+ let names = array
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("struct name");
+ let values = array
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("struct value");
+ Some((
+ (!names.is_null(row)).then(|| names.value(row).to_string()),
+ (!values.is_null(row)).then(|| values.value(row)),
+ ))
+ }
+
+ let (plan, batches) =
scan_and_read_with_fs_catalog("full_types_boundary_table", None).await;
+ let formats: HashSet<&str> = plan
+ .splits()
+ .iter()
+ .flat_map(|split| split.data_files())
+ .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
+ .collect();
+ assert_eq!(
+ formats,
+ HashSet::from(["avro", "orc", "parquet"]),
+ "full_types_boundary_table should scan all provisioned file formats"
+ );
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(
+ total_rows, 6,
+ "full_types_boundary_table should have 6 rows"
+ );
+
+ let mut rows = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let col_boolean = batch
+ .column_by_name("col_boolean")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+ let col_tinyint = batch
+ .column_by_name("col_tinyint")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int8Array>()
+ .unwrap();
+ let col_smallint = batch
+ .column_by_name("col_smallint")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int16Array>()
+ .unwrap();
+ let col_int = batch
+ .column_by_name("col_int")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let col_bigint = batch
+ .column_by_name("col_bigint")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ let col_float = batch
+ .column_by_name("col_float")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Float32Array>()
+ .unwrap();
+ let col_double = batch
+ .column_by_name("col_double")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ let col_decimal = batch
+ .column_by_name("col_decimal")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap();
+ let col_decimal5 = batch
+ .column_by_name("col_decimal5")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap();
+ let col_decimal38 = batch
+ .column_by_name("col_decimal38")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap();
+ let col_string = batch
+ .column_by_name("col_string")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ let col_binary = batch
+ .column_by_name("col_binary")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .unwrap();
+ let col_date = batch
+ .column_by_name("col_date")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .unwrap();
+ let col_timestamp = batch
+ .column_by_name("col_timestamp")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ let col_timestamp_ltz = batch
+ .column_by_name("col_timestamp_ltz")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ let col_array = batch
+ .column_by_name("col_array")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ let col_map = batch
+ .column_by_name("col_map")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<MapArray>()
+ .unwrap();
+ let col_struct = batch
+ .column_by_name("col_struct")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+
+ for i in 0..batch.num_rows() {
+ rows.push(BoundaryRow {
+ id: id.value(i),
+ col_boolean: bool_value(col_boolean, i),
+ col_tinyint: primitive_value(col_tinyint, i),
+ col_smallint: primitive_value(col_smallint, i),
+ col_int: primitive_value(col_int, i),
+ col_bigint: primitive_value(col_bigint, i),
+ col_float: primitive_value(col_float, i),
+ col_double: primitive_value(col_double, i),
+ col_decimal: primitive_value(col_decimal, i),
+ col_decimal5: primitive_value(col_decimal5, i),
+ col_decimal38: primitive_value(col_decimal38, i),
+ col_string: string_value(col_string, i),
+ col_binary: binary_value(col_binary, i),
+ col_date: primitive_value(col_date, i),
+ col_timestamp: primitive_value(col_timestamp, i),
+ col_timestamp_ltz: primitive_value(col_timestamp_ltz, i),
+ col_array: list_i32_value(col_array, i),
+ col_map: map_string_i32_value(col_map, i),
+ col_struct: struct_string_i32_value(col_struct, i),
+ });
+ }
+ }
+ rows.sort_by_key(|row| row.id);
+
+ assert_eq!(
+ rows,
+ vec![
+ BoundaryRow {
+ id: 1,
+ col_boolean: Some(false),
+ col_tinyint: Some(i8::MIN),
+ col_smallint: Some(i16::MIN),
+ col_int: Some(i32::MIN),
+ col_bigint: Some(i64::MIN),
+ col_float: Some(-0.5),
+ col_double: Some(-1.25),
+ col_decimal: Some(-9_999_999_999),
+ col_decimal5: Some(-99999),
+ col_decimal38:
Some(-99_999_999_999_999_999_999_999_999_999_999_999_999),
+ col_string: Some(String::new()),
+ col_binary: Some(Vec::new()),
+ col_date: Some(-1),
+ col_timestamp: Some(1),
+ col_timestamp_ltz: Some(1),
+ col_array: Some(vec![None, Some(i32::MIN), Some(0)]),
+ col_map: Some(vec![
+ ("negative".into(), Some(i32::MIN)),
+ ("zero".into(), None)
+ ]),
+ col_struct: Some((None, Some(-1))),
+ },
+ BoundaryRow {
+ id: 2,
+ col_boolean: None,
+ col_tinyint: None,
+ col_smallint: None,
+ col_int: None,
+ col_bigint: None,
+ col_float: None,
+ col_double: None,
+ col_decimal: None,
+ col_decimal5: None,
+ col_decimal38: None,
+ col_string: None,
+ col_binary: None,
+ col_date: None,
+ col_timestamp: None,
+ col_timestamp_ltz: None,
+ col_array: None,
+ col_map: None,
+ col_struct: None,
+ },
+ BoundaryRow {
+ id: 3,
+ col_boolean: Some(true),
+ col_tinyint: Some(i8::MAX),
+ col_smallint: Some(i16::MAX),
+ col_int: Some(i32::MAX),
+ col_bigint: Some(i64::MAX),
+ col_float: Some(0.25),
+ col_double: Some(0.5),
+ col_decimal: Some(9_999_999_999),
+ col_decimal5: Some(99999),
+ col_decimal38:
Some(99_999_999_999_999_999_999_999_999_999_999_999_999),
+ col_string: Some("orc-boundary".into()),
+ col_binary: Some(vec![0x00, 0xFF]),
+ col_date: Some(0),
+ col_timestamp: Some(0),
+ col_timestamp_ltz: Some(0),
+ col_array: Some(vec![]),
+ col_map: Some(vec![]),
+ col_struct: Some((Some("orc".into()), None)),
+ },
+ BoundaryRow {
+ id: 4,
+ col_boolean: None,
+ col_tinyint: None,
+ col_smallint: None,
+ col_int: None,
+ col_bigint: None,
+ col_float: None,
+ col_double: None,
+ col_decimal: None,
+ col_decimal5: None,
+ col_decimal38: None,
+ col_string: None,
+ col_binary: None,
+ col_date: None,
+ col_timestamp: None,
+ col_timestamp_ltz: None,
+ col_array: None,
+ col_map: None,
+ col_struct: None,
+ },
+ BoundaryRow {
+ id: 5,
+ col_boolean: Some(false),
+ col_tinyint: Some(0),
+ col_smallint: Some(0),
+ col_int: Some(0),
+ col_bigint: Some(0),
+ col_float: Some(0.0),
+ col_double: Some(0.0),
+ col_decimal: Some(0),
+ col_decimal5: Some(0),
+ col_decimal38: Some(0),
+ col_string: Some("avro-boundary".into()),
+ col_binary: Some(vec![0x01, 0x02]),
+ col_date: Some(1),
+ col_timestamp: Some(999_999),
+ col_timestamp_ltz: Some(999_999),
+ col_array: Some(vec![Some(7)]),
+ col_map: Some(vec![("seven".into(), Some(7))]),
+ col_struct: Some((Some("avro".into()), Some(7))),
+ },
+ BoundaryRow {
+ id: 6,
+ col_boolean: None,
+ col_tinyint: None,
+ col_smallint: None,
+ col_int: None,
+ col_bigint: None,
+ col_float: None,
+ col_double: None,
+ col_decimal: None,
+ col_decimal5: None,
+ col_decimal38: None,
+ col_string: None,
+ col_binary: None,
+ col_date: None,
+ col_timestamp: None,
+ col_timestamp_ltz: None,
+ col_array: None,
+ col_map: None,
+ col_struct: None,
+ },
+ ]
+ );
+}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 53bda45..c7c408d 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -822,6 +822,122 @@ def main():
"""
)
+ # ===== Full types boundary table: parquet, orc, avro =====
+ # Each format writes one boundary row and one all-null row for nullable
fields.
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS full_types_boundary_table (
+ id INT,
+ col_boolean BOOLEAN,
+ col_tinyint TINYINT,
+ col_smallint SMALLINT,
+ col_int INT,
+ col_bigint BIGINT,
+ col_float FLOAT,
+ col_double DOUBLE,
+ col_decimal DECIMAL(10, 2),
+ col_decimal5 DECIMAL(5),
+ col_decimal38 DECIMAL(38, 18),
+ col_string STRING,
+ col_binary BINARY,
+ col_date DATE,
+ col_timestamp TIMESTAMP_NTZ,
+ col_timestamp_ltz TIMESTAMP,
+ col_array ARRAY<INT>,
+ col_map MAP<STRING, INT>,
+ col_struct STRUCT<name: STRING, value: INT>
+ ) USING paimon
+ TBLPROPERTIES (
+ 'file.format' = 'parquet'
+ )
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO full_types_boundary_table VALUES
+ (1, false, CAST('-128' AS TINYINT), CAST('-32768' AS SMALLINT),
+ CAST('-2147483648' AS INT), CAST('-9223372036854775808' AS
BIGINT),
+ CAST(-0.5 AS FLOAT), -1.25, CAST('-99999999.99' AS DECIMAL(10,2)),
+ CAST('-99999' AS DECIMAL(5)),
+ CAST('-99999999999999999999.999999999999999999' AS
DECIMAL(38,18)),
+ '', X'',
+ DATE '1969-12-31',
+ TIMESTAMP_NTZ '1970-01-01 00:00:00.000001',
+ TIMESTAMP '1970-01-01 00:00:00.000001',
+ array(CAST(NULL AS INT), CAST('-2147483648' AS INT), CAST(0 AS
INT)),
+ map('negative', CAST('-2147483648' AS INT), 'zero', CAST(NULL AS
INT)),
+ named_struct('name', CAST(NULL AS STRING), 'value', CAST(-1 AS
INT))),
+ (2, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS
SMALLINT),
+ CAST(NULL AS INT), CAST(NULL AS BIGINT),
+ CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS
DECIMAL(10,2)),
+ CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)),
+ CAST(NULL AS STRING), CAST(NULL AS BINARY),
+ CAST(NULL AS DATE),
+ CAST(NULL AS TIMESTAMP_NTZ),
+ CAST(NULL AS TIMESTAMP),
+ CAST(NULL AS ARRAY<INT>),
+ CAST(NULL AS MAP<STRING, INT>),
+ CAST(NULL AS STRUCT<name: STRING, value: INT>))
+ """
+ )
+ spark.sql("ALTER TABLE full_types_boundary_table SET TBLPROPERTIES
('file.format' = 'orc')")
+ spark.sql(
+ """
+ INSERT INTO full_types_boundary_table VALUES
+ (3, true, CAST('127' AS TINYINT), CAST('32767' AS SMALLINT),
+ CAST('2147483647' AS INT), CAST('9223372036854775807' AS BIGINT),
+ CAST(0.25 AS FLOAT), 0.5, CAST('99999999.99' AS DECIMAL(10,2)),
+ CAST('99999' AS DECIMAL(5)),
+ CAST('99999999999999999999.999999999999999999' AS DECIMAL(38,18)),
+ 'orc-boundary', X'00FF',
+ DATE '1970-01-01',
+ TIMESTAMP_NTZ '1970-01-01 00:00:00',
+ TIMESTAMP '1970-01-01 00:00:00',
+ CAST(array() AS ARRAY<INT>),
+ CAST(map() AS MAP<STRING, INT>),
+ named_struct('name', 'orc', 'value', CAST(NULL AS INT))),
+ (4, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS
SMALLINT),
+ CAST(NULL AS INT), CAST(NULL AS BIGINT),
+ CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS
DECIMAL(10,2)),
+ CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)),
+ CAST(NULL AS STRING), CAST(NULL AS BINARY),
+ CAST(NULL AS DATE),
+ CAST(NULL AS TIMESTAMP_NTZ),
+ CAST(NULL AS TIMESTAMP),
+ CAST(NULL AS ARRAY<INT>),
+ CAST(NULL AS MAP<STRING, INT>),
+ CAST(NULL AS STRUCT<name: STRING, value: INT>))
+ """
+ )
+ spark.sql("ALTER TABLE full_types_boundary_table SET TBLPROPERTIES
('file.format' = 'avro')")
+ spark.sql(
+ """
+ INSERT INTO full_types_boundary_table VALUES
+ (5, false, CAST(0 AS TINYINT), CAST(0 AS SMALLINT),
+ 0, 0,
+ CAST(0.0 AS FLOAT), 0.0, CAST(0.00 AS DECIMAL(10,2)),
+ CAST(0 AS DECIMAL(5)), CAST(0 AS DECIMAL(38,18)),
+ 'avro-boundary', X'0102',
+ DATE '1970-01-02',
+ TIMESTAMP_NTZ '1970-01-01 00:00:00.999999',
+ TIMESTAMP '1970-01-01 00:00:00.999999',
+ array(CAST(7 AS INT)),
+ map('seven', 7),
+ named_struct('name', 'avro', 'value', 7)),
+ (6, CAST(NULL AS BOOLEAN), CAST(NULL AS TINYINT), CAST(NULL AS
SMALLINT),
+ CAST(NULL AS INT), CAST(NULL AS BIGINT),
+ CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS
DECIMAL(10,2)),
+ CAST(NULL AS DECIMAL(5)), CAST(NULL AS DECIMAL(38,18)),
+ CAST(NULL AS STRING), CAST(NULL AS BINARY),
+ CAST(NULL AS DATE),
+ CAST(NULL AS TIMESTAMP_NTZ),
+ CAST(NULL AS TIMESTAMP),
+ CAST(NULL AS ARRAY<INT>),
+ CAST(NULL AS MAP<STRING, INT>),
+ CAST(NULL AS STRUCT<name: STRING, value: INT>))
+ """
+ )
+
# ===== First-Row merge engine PK table =====
# first-row keeps the earliest inserted row per key; later duplicates are
ignored.