This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 35b6386  feat: support complex types (#202)
35b6386 is described below

commit 35b6386629742b70d0d0719ebfcab743ef9a39f5
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Apr 4 10:44:18 2026 +0800

    feat: support complex types (#202)
---
 crates/integration_tests/tests/read_tables.rs      | 100 ++++++++++++++++++++-
 .../integrations/datafusion/tests/read_tables.rs   |  76 ++++++++++++++++
 crates/paimon/src/arrow/reader.rs                  |  19 ++--
 dev/spark/provision.py                             |  20 +++++
 4 files changed, 208 insertions(+), 7 deletions(-)

diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 2411a9c..c751c8d 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -17,7 +17,10 @@
 
 //! Integration tests for reading Paimon tables provisioned by Spark.
 
-use arrow_array::{Array, ArrowPrimitiveType, Int32Array, Int64Array, 
RecordBatch, StringArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, Int32Array, Int64Array, ListArray, MapArray, 
RecordBatch,
+    StringArray, StructArray,
+};
 use futures::TryStreamExt;
 use paimon::api::ConfigResponse;
 use paimon::catalog::{Identifier, RESTCatalog};
@@ -1369,3 +1372,98 @@ async fn test_read_schema_evolution_drop_column() {
         "Old rows should be readable after DROP COLUMN, with only remaining 
columns"
     );
 }
+
+// ---------------------------------------------------------------------------
+// Complex type integration tests
+// ---------------------------------------------------------------------------
+
+/// Test reading a table with complex types: ARRAY<INT>, MAP<STRING, INT>, 
STRUCT<name: STRING, value: INT>.
+#[tokio::test]
+async fn test_read_complex_type_table() {
+    let (_, batches) = scan_and_read_with_fs_catalog("complex_type_table", 
None).await;
+
+    #[allow(clippy::type_complexity)]
+    let mut rows: Vec<(i32, Vec<i32>, Vec<(String, i32)>, (String, i32))> = 
Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let int_array = batch
+            .column_by_name("int_array")
+            .and_then(|c| c.as_any().downcast_ref::<ListArray>())
+            .expect("int_array as ListArray");
+        let string_map = batch
+            .column_by_name("string_map")
+            .and_then(|c| c.as_any().downcast_ref::<MapArray>())
+            .expect("string_map as MapArray");
+        let row_field = batch
+            .column_by_name("row_field")
+            .and_then(|c| c.as_any().downcast_ref::<StructArray>())
+            .expect("row_field as StructArray");
+
+        for i in 0..batch.num_rows() {
+            // Extract ARRAY<INT>
+            let list_values = int_array.value(i);
+            let int_arr = list_values
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .expect("list element as Int32Array");
+            let arr_vals: Vec<i32> = (0..int_arr.len()).map(|j| 
int_arr.value(j)).collect();
+
+            // Extract MAP<STRING, INT>
+            let map_val = string_map.value(i);
+            let map_struct = map_val
+                .as_any()
+                .downcast_ref::<StructArray>()
+                .expect("map entries as StructArray");
+            let keys = map_struct
+                .column(0)
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .expect("map keys");
+            let values = map_struct
+                .column(1)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .expect("map values");
+            let mut map_entries: Vec<(String, i32)> = (0..keys.len())
+                .map(|j| (keys.value(j).to_string(), values.value(j)))
+                .collect();
+            map_entries.sort_by(|a, b| a.0.cmp(&b.0));
+
+            // Extract STRUCT<name: STRING, value: INT>
+            let struct_name = row_field
+                .column_by_name("name")
+                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+                .expect("struct name");
+            let struct_value = row_field
+                .column_by_name("value")
+                .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+                .expect("struct value");
+
+            rows.push((
+                id.value(i),
+                arr_vals,
+                map_entries,
+                (struct_name.value(i).to_string(), struct_value.value(i)),
+            ));
+        }
+    }
+    rows.sort_by_key(|(id, _, _, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![
+            (
+                1,
+                vec![1, 2, 3],
+                vec![("a".into(), 10), ("b".into(), 20)],
+                ("alice".into(), 100),
+            ),
+            (2, vec![4, 5], vec![("c".into(), 30)], ("bob".into(), 200),),
+            (3, vec![], vec![], ("carol".into(), 300),),
+        ],
+        "Complex type table should return correct ARRAY, MAP, and STRUCT 
values"
+    );
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index 4a65a10..d438720 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -523,3 +523,79 @@ async fn test_data_evolution_drop_column_null_fill() {
         "Old rows should have extra=NULL, new row should have extra='new'"
     );
 }
+
+// ======================= Complex Type Tests =======================
+
+#[tokio::test]
+async fn test_read_complex_type_table_via_datafusion() {
+    let batches = collect_query(
+        "complex_type_table",
+        "SELECT id, int_array, string_map, row_field FROM complex_type_table 
ORDER BY id",
+    )
+    .await
+    .expect("Complex type query should succeed");
+
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    assert_eq!(total_rows, 3, "Expected 3 rows from complex_type_table");
+
+    // Verify column types exist and are correct
+    for batch in &batches {
+        let schema = batch.schema();
+        assert!(
+            schema.field_with_name("int_array").is_ok(),
+            "int_array column should exist"
+        );
+        assert!(
+            schema.field_with_name("string_map").is_ok(),
+            "string_map column should exist"
+        );
+        assert!(
+            schema.field_with_name("row_field").is_ok(),
+            "row_field column should exist"
+        );
+    }
+
+    // Extract and verify data using Arrow arrays
+    let mut rows: Vec<(i32, String, String, String)> = Vec::new();
+    for batch in &batches {
+        let id_array = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("Expected Int32Array for id");
+        let int_array_col = 
batch.column_by_name("int_array").expect("int_array");
+        let string_map_col = 
batch.column_by_name("string_map").expect("string_map");
+        let row_field_col = 
batch.column_by_name("row_field").expect("row_field");
+
+        for i in 0..batch.num_rows() {
+            use datafusion::arrow::util::display::ArrayFormatter;
+            let fmt_opts = 
datafusion::arrow::util::display::FormatOptions::default();
+
+            let arr_fmt = ArrayFormatter::try_new(int_array_col.as_ref(), 
&fmt_opts).unwrap();
+            let map_fmt = ArrayFormatter::try_new(string_map_col.as_ref(), 
&fmt_opts).unwrap();
+            let row_fmt = ArrayFormatter::try_new(row_field_col.as_ref(), 
&fmt_opts).unwrap();
+
+            rows.push((
+                id_array.value(i),
+                arr_fmt.value(i).to_string(),
+                map_fmt.value(i).to_string(),
+                row_fmt.value(i).to_string(),
+            ));
+        }
+    }
+    rows.sort_by_key(|(id, _, _, _)| *id);
+
+    assert_eq!(rows[0].0, 1);
+    assert_eq!(rows[0].1, "[1, 2, 3]");
+    assert_eq!(rows[0].2, "{a: 10, b: 20}");
+    assert_eq!(rows[0].3, "{name: alice, value: 100}");
+
+    assert_eq!(rows[1].0, 2);
+    assert_eq!(rows[1].1, "[4, 5]");
+    assert_eq!(rows[1].2, "{c: 30}");
+    assert_eq!(rows[1].3, "{name: bob, value: 200}");
+
+    assert_eq!(rows[2].0, 3);
+    assert_eq!(rows[2].1, "[]");
+    assert_eq!(rows[2].2, "{}");
+    assert_eq!(rows[2].3, "{name: carol, value: 300}");
+}
diff --git a/crates/paimon/src/arrow/reader.rs 
b/crates/paimon/src/arrow/reader.rs
index 212f32f..a676e61 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -283,16 +283,23 @@ fn read_single_file_stream(
 
         let mut batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
 
-        // Only project columns that exist in this file.
+        // Project columns by root-level index to correctly handle complex 
types
+        // (ARRAY, MAP, STRUCT). `ProjectionMask::columns` matches leaf column 
names
+        // which doesn't work for nested types; `ProjectionMask::roots` uses 
top-level
+        // field indices instead.
         let parquet_schema = batch_stream_builder.parquet_schema().clone();
-        let file_column_names: Vec<&str> = 
parquet_schema.columns().iter().map(|c| c.name()).collect();
-        let available_columns: Vec<&str> = parquet_column_names
+        let root_schema = parquet_schema.root_schema();
+        let root_indices: Vec<usize> = parquet_column_names
             .iter()
-            .filter(|name| file_column_names.contains(&name.as_str()))
-            .map(String::as_str)
+            .filter_map(|name| {
+                root_schema
+                    .get_fields()
+                    .iter()
+                    .position(|f| f.name() == name)
+            })
             .collect();
 
-        let mask = ProjectionMask::columns(&parquet_schema, 
available_columns.iter().copied());
+        let mask = ProjectionMask::roots(&parquet_schema, root_indices);
         batch_stream_builder = batch_stream_builder.with_projection(mask);
 
         if let Some(ref dv) = dv {
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 7bca4d2..f22338b 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -515,6 +515,26 @@ def main():
         """
     )
 
+    # ===== Complex Types table: ARRAY, MAP, STRUCT =====
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS complex_type_table (
+            id INT,
+            int_array ARRAY<INT>,
+            string_map MAP<STRING, INT>,
+            row_field STRUCT<name: STRING, value: INT>
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO complex_type_table VALUES
+            (1, array(1, 2, 3), map('a', 10, 'b', 20), named_struct('name', 
'alice', 'value', 100)),
+            (2, array(4, 5), map('c', 30), named_struct('name', 'bob', 
'value', 200)),
+            (3, array(), map(), named_struct('name', 'carol', 'value', 300))
+        """
+    )
+
 
 if __name__ == "__main__":
     main()

Reply via email to