This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 35b6386 feat: support complex types (#202)
35b6386 is described below
commit 35b6386629742b70d0d0719ebfcab743ef9a39f5
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Apr 4 10:44:18 2026 +0800
feat: support complex types (#202)
---
crates/integration_tests/tests/read_tables.rs | 100 ++++++++++++++++++++-
.../integrations/datafusion/tests/read_tables.rs | 76 ++++++++++++++++
crates/paimon/src/arrow/reader.rs | 19 ++--
dev/spark/provision.py | 20 +++++
4 files changed, 208 insertions(+), 7 deletions(-)
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 2411a9c..c751c8d 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -17,7 +17,10 @@
//! Integration tests for reading Paimon tables provisioned by Spark.
-use arrow_array::{Array, ArrowPrimitiveType, Int32Array, Int64Array,
RecordBatch, StringArray};
+use arrow_array::{
+ Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray,
RecordBatch,
+ StringArray, StructArray,
+};
use futures::TryStreamExt;
use paimon::api::ConfigResponse;
use paimon::catalog::{Identifier, RESTCatalog};
@@ -1369,3 +1372,98 @@ async fn test_read_schema_evolution_drop_column() {
"Old rows should be readable after DROP COLUMN, with only remaining
columns"
);
}
+
+// ---------------------------------------------------------------------------
+// Complex type integration tests
+// ---------------------------------------------------------------------------
+
+/// Test reading a table with complex types: ARRAY<INT>, MAP<STRING, INT>,
STRUCT<name: STRING, value: INT>.
+#[tokio::test]
+async fn test_read_complex_type_table() {
+ let (_, batches) = scan_and_read_with_fs_catalog("complex_type_table",
None).await;
+
+ #[allow(clippy::type_complexity)]
+ let mut rows: Vec<(i32, Vec<i32>, Vec<(String, i32)>, (String, i32))> =
Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let int_array = batch
+ .column_by_name("int_array")
+ .and_then(|c| c.as_any().downcast_ref::<ListArray>())
+ .expect("int_array as ListArray");
+ let string_map = batch
+ .column_by_name("string_map")
+ .and_then(|c| c.as_any().downcast_ref::<MapArray>())
+ .expect("string_map as MapArray");
+ let row_field = batch
+ .column_by_name("row_field")
+ .and_then(|c| c.as_any().downcast_ref::<StructArray>())
+ .expect("row_field as StructArray");
+
+ for i in 0..batch.num_rows() {
+ // Extract ARRAY<INT>
+ let list_values = int_array.value(i);
+ let int_arr = list_values
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("list element as Int32Array");
+ let arr_vals: Vec<i32> = (0..int_arr.len()).map(|j|
int_arr.value(j)).collect();
+
+ // Extract MAP<STRING, INT>
+ let map_val = string_map.value(i);
+ let map_struct = map_val
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .expect("map entries as StructArray");
+ let keys = map_struct
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("map keys");
+ let values = map_struct
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("map values");
+ let mut map_entries: Vec<(String, i32)> = (0..keys.len())
+ .map(|j| (keys.value(j).to_string(), values.value(j)))
+ .collect();
+ map_entries.sort_by(|a, b| a.0.cmp(&b.0));
+
+ // Extract STRUCT<name: STRING, value: INT>
+ let struct_name = row_field
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("struct name");
+ let struct_value = row_field
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("struct value");
+
+ rows.push((
+ id.value(i),
+ arr_vals,
+ map_entries,
+ (struct_name.value(i).to_string(), struct_value.value(i)),
+ ));
+ }
+ }
+ rows.sort_by_key(|(id, _, _, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![
+ (
+ 1,
+ vec![1, 2, 3],
+ vec![("a".into(), 10), ("b".into(), 20)],
+ ("alice".into(), 100),
+ ),
+ (2, vec![4, 5], vec![("c".into(), 30)], ("bob".into(), 200),),
+ (3, vec![], vec![], ("carol".into(), 300),),
+ ],
+ "Complex type table should return correct ARRAY, MAP, and STRUCT
values"
+ );
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index 4a65a10..d438720 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -523,3 +523,79 @@ async fn test_data_evolution_drop_column_null_fill() {
"Old rows should have extra=NULL, new row should have extra='new'"
);
}
+
+// ======================= Complex Type Tests =======================
+
+#[tokio::test]
+async fn test_read_complex_type_table_via_datafusion() {
+ let batches = collect_query(
+ "complex_type_table",
+ "SELECT id, int_array, string_map, row_field FROM complex_type_table
ORDER BY id",
+ )
+ .await
+ .expect("Complex type query should succeed");
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 3, "Expected 3 rows from complex_type_table");
+
+ // Verify column types exist and are correct
+ for batch in &batches {
+ let schema = batch.schema();
+ assert!(
+ schema.field_with_name("int_array").is_ok(),
+ "int_array column should exist"
+ );
+ assert!(
+ schema.field_with_name("string_map").is_ok(),
+ "string_map column should exist"
+ );
+ assert!(
+ schema.field_with_name("row_field").is_ok(),
+ "row_field column should exist"
+ );
+ }
+
+ // Extract and verify data using Arrow arrays
+ let mut rows: Vec<(i32, String, String, String)> = Vec::new();
+ for batch in &batches {
+ let id_array = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("Expected Int32Array for id");
+ let int_array_col =
batch.column_by_name("int_array").expect("int_array");
+ let string_map_col =
batch.column_by_name("string_map").expect("string_map");
+ let row_field_col =
batch.column_by_name("row_field").expect("row_field");
+
+ for i in 0..batch.num_rows() {
+ use datafusion::arrow::util::display::ArrayFormatter;
+ let fmt_opts =
datafusion::arrow::util::display::FormatOptions::default();
+
+ let arr_fmt = ArrayFormatter::try_new(int_array_col.as_ref(),
&fmt_opts).unwrap();
+ let map_fmt = ArrayFormatter::try_new(string_map_col.as_ref(),
&fmt_opts).unwrap();
+ let row_fmt = ArrayFormatter::try_new(row_field_col.as_ref(),
&fmt_opts).unwrap();
+
+ rows.push((
+ id_array.value(i),
+ arr_fmt.value(i).to_string(),
+ map_fmt.value(i).to_string(),
+ row_fmt.value(i).to_string(),
+ ));
+ }
+ }
+ rows.sort_by_key(|(id, _, _, _)| *id);
+
+ assert_eq!(rows[0].0, 1);
+ assert_eq!(rows[0].1, "[1, 2, 3]");
+ assert_eq!(rows[0].2, "{a: 10, b: 20}");
+ assert_eq!(rows[0].3, "{name: alice, value: 100}");
+
+ assert_eq!(rows[1].0, 2);
+ assert_eq!(rows[1].1, "[4, 5]");
+ assert_eq!(rows[1].2, "{c: 30}");
+ assert_eq!(rows[1].3, "{name: bob, value: 200}");
+
+ assert_eq!(rows[2].0, 3);
+ assert_eq!(rows[2].1, "[]");
+ assert_eq!(rows[2].2, "{}");
+ assert_eq!(rows[2].3, "{name: carol, value: 300}");
+}
diff --git a/crates/paimon/src/arrow/reader.rs
b/crates/paimon/src/arrow/reader.rs
index 212f32f..a676e61 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -283,16 +283,23 @@ fn read_single_file_stream(
let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
- // Only project columns that exist in this file.
+ // Project columns by root-level index to correctly handle complex
types
+ // (ARRAY, MAP, STRUCT). `ProjectionMask::columns` matches leaf column
names
+ // which doesn't work for nested types; `ProjectionMask::roots` uses
top-level
+ // field indices instead.
let parquet_schema = batch_stream_builder.parquet_schema().clone();
- let file_column_names: Vec<&str> =
parquet_schema.columns().iter().map(|c| c.name()).collect();
- let available_columns: Vec<&str> = parquet_column_names
+ let root_schema = parquet_schema.root_schema();
+ let root_indices: Vec<usize> = parquet_column_names
.iter()
- .filter(|name| file_column_names.contains(&name.as_str()))
- .map(String::as_str)
+ .filter_map(|name| {
+ root_schema
+ .get_fields()
+ .iter()
+ .position(|f| f.name() == name)
+ })
.collect();
- let mask = ProjectionMask::columns(&parquet_schema,
available_columns.iter().copied());
+ let mask = ProjectionMask::roots(&parquet_schema, root_indices);
batch_stream_builder = batch_stream_builder.with_projection(mask);
if let Some(ref dv) = dv {
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 7bca4d2..f22338b 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -515,6 +515,26 @@ def main():
"""
)
+ # ===== Complex Types table: ARRAY, MAP, STRUCT =====
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS complex_type_table (
+ id INT,
+ int_array ARRAY<INT>,
+ string_map MAP<STRING, INT>,
+ row_field STRUCT<name: STRING, value: INT>
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO complex_type_table VALUES
+ (1, array(1, 2, 3), map('a', 10, 'b', 20), named_struct('name',
'alice', 'value', 100)),
+ (2, array(4, 5), map('c', 30), named_struct('name', 'bob',
'value', 200)),
+ (3, array(), map(), named_struct('name', 'carol', 'value', 300))
+ """
+ )
+
if __name__ == "__main__":
main()