This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9619f79  test(datafusion): add vortex SQL e2e (#321)
9619f79 is described below

commit 9619f79752fd7ea5bb228302e1f2ee7542234fff
Author: Jiwen liu <[email protected]>
AuthorDate: Tue May 19 15:06:17 2026 +0800

    test(datafusion): add vortex SQL e2e (#321)
---
 .github/workflows/ci.yml                           |  6 ++
 crates/integrations/datafusion/Cargo.toml          |  1 +
 crates/integrations/datafusion/tests/common/mod.rs | 18 ++++-
 .../integrations/datafusion/tests/vortex_tables.rs | 80 +++++++++++++++++++
 crates/paimon/src/arrow/format/vortex.rs           | 90 +++++++++++++++++++++-
 5 files changed, 190 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 990a99c..3a807d8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -126,6 +126,12 @@ jobs:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
 
+      - name: DataFusion Vortex Integration Test
+        run: cargo test -p paimon-datafusion --features vortex --test 
vortex_tables
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
+
       - name: Install uv
         uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78
         with:
diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
index 30b5100..3e7392d 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -29,6 +29,7 @@ keywords = ["paimon", "datafusion", "integrations"]
 
 [features]
 fulltext = ["paimon/fulltext"]
+vortex = ["paimon/vortex"]
 
 [dependencies]
 async-trait = "0.1"
diff --git a/crates/integrations/datafusion/tests/common/mod.rs 
b/crates/integrations/datafusion/tests/common/mod.rs
index d32734c..bcd0cd7 100644
--- a/crates/integrations/datafusion/tests/common/mod.rs
+++ b/crates/integrations/datafusion/tests/common/mod.rs
@@ -54,9 +54,24 @@ pub async fn setup_sql_context() -> (TempDir, SQLContext) {
 
 #[allow(dead_code)]
 pub async fn collect_id_name(sql_context: &SQLContext, sql: &str) -> Vec<(i32, 
String)> {
+    let mut rows = collect_id_name_in_batch_order(sql_context, sql).await;
+    rows.sort_by_key(|(id, _)| *id);
+    rows
+}
+
+#[allow(dead_code)]
+pub async fn collect_id_name_in_batch_order(
+    sql_context: &SQLContext,
+    sql: &str,
+) -> Vec<(i32, String)> {
     let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap();
+    collect_id_name_from_batches_in_order(&batches)
+}
+
+#[allow(dead_code)]
+pub fn collect_id_name_from_batches_in_order(batches: &[RecordBatch]) -> 
Vec<(i32, String)> {
     let mut rows = Vec::new();
-    for batch in &batches {
+    for batch in batches {
         let ids = batch
             .column_by_name("id")
             .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
@@ -69,7 +84,6 @@ pub async fn collect_id_name(sql_context: &SQLContext, sql: 
&str) -> Vec<(i32, S
             rows.push((ids.value(i), names.value(i).to_string()));
         }
     }
-    rows.sort_by_key(|(id, _)| *id);
     rows
 }
 
diff --git a/crates/integrations/datafusion/tests/vortex_tables.rs 
b/crates/integrations/datafusion/tests/vortex_tables.rs
new file mode 100644
index 0000000..9492429
--- /dev/null
+++ b/crates/integrations/datafusion/tests/vortex_tables.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![cfg(feature = "vortex")]
+
+//! Vortex file format SQL end-to-end tests.
+
+mod common;
+
+use std::path::Path;
+
+#[tokio::test]
+async fn test_vortex_file_format_sql_e2e() {
+    let (tmp, sql_context) = common::setup_sql_context().await;
+
+    common::exec(
+        &sql_context,
+        "CREATE TABLE paimon.test_db.t (
+            id INT,
+            name STRING
+        ) WITH (
+            'file.format' = 'vortex'
+        )",
+    )
+    .await;
+
+    common::exec(
+        &sql_context,
+        "INSERT INTO paimon.test_db.t VALUES (1, 'Alice'), (2, 'Bob')",
+    )
+    .await;
+
+    assert!(
+        contains_vortex_file(tmp.path()),
+        "expected Vortex data file"
+    );
+
+    let rows = common::collect_id_name_in_batch_order(
+        &sql_context,
+        "SELECT id, name FROM paimon.test_db.t ORDER BY id",
+    )
+    .await;
+    assert_eq!(rows, vec![(1, "Alice".to_string()), (2, "Bob".to_string())]);
+
+    let filtered = common::collect_id_name_in_batch_order(
+        &sql_context,
+        "SELECT id, name FROM paimon.test_db.t WHERE id = 2",
+    )
+    .await;
+    assert_eq!(filtered, vec![(2, "Bob".to_string())]);
+}
+
+fn contains_vortex_file(path: &Path) -> bool {
+    let entries = std::fs::read_dir(path).expect("read warehouse dir");
+    for entry in entries {
+        let path = entry.expect("read dir entry").path();
+        if path.is_dir() {
+            if contains_vortex_file(&path) {
+                return true;
+            }
+        } else if path.extension().is_some_and(|ext| ext == "vortex") {
+            return true;
+        }
+    }
+    false
+}
diff --git a/crates/paimon/src/arrow/format/vortex.rs 
b/crates/paimon/src/arrow/format/vortex.rs
index 3b7ba27..f0a3a48 100644
--- a/crates/paimon/src/arrow/format/vortex.rs
+++ b/crates/paimon/src/arrow/format/vortex.rs
@@ -21,7 +21,7 @@ use crate::spec::{DataField, Datum, Predicate, 
PredicateOperator};
 use crate::table::{ArrowRecordBatchStream, RowRange};
 use crate::Error;
 use arrow_array::RecordBatch;
-use arrow_schema::SchemaRef;
+use arrow_schema::{DataType as ArrowDataType, SchemaRef};
 use async_trait::async_trait;
 use futures::future::BoxFuture;
 use futures::StreamExt;
@@ -472,7 +472,7 @@ fn vortex_array_to_record_batch(
     schema: &SchemaRef,
 ) -> crate::Result<RecordBatch> {
     let arrow_array = vortex_array
-        .into_arrow_preferred()
+        .into_arrow(&ArrowDataType::Struct(schema.fields().clone()))
         .map_err(|e| Error::DataInvalid {
             message: format!("Failed to convert Vortex array to Arrow: {e}"),
             source: None,
@@ -486,6 +486,17 @@ fn vortex_array_to_record_batch(
             source: None,
         })?;
 
+    if struct_array.columns().len() != schema.fields().len() {
+        return Err(Error::DataInvalid {
+            message: format!(
+                "Vortex column count {} does not match target schema column 
count {}",
+                struct_array.columns().len(),
+                schema.fields().len()
+            ),
+            source: None,
+        });
+    }
+
     RecordBatch::try_new(schema.clone(), 
struct_array.columns().to_vec()).map_err(|e| {
         Error::DataInvalid {
             message: format!("Failed to build RecordBatch from Vortex data: 
{e}"),
@@ -680,7 +691,8 @@ mod tests {
     use super::*;
     use crate::arrow::format::FormatFileWriter;
     use crate::io::FileIOBuilder;
-    use arrow_array::Int32Array;
+    use crate::spec::{DataField, DataType, VarCharType};
+    use arrow_array::{Int32Array, StringArray};
     use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema 
as ArrowSchema};
 
     fn test_arrow_schema() -> Arc<ArrowSchema> {
@@ -758,6 +770,78 @@ mod tests {
         assert_eq!(total_rows, 3);
     }
 
+    #[tokio::test]
+    async fn test_vortex_reader_returns_utf8_for_string_schema() {
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let path = "memory:/test_vortex_utf8_schema.vortex";
+        let output = file_io.new_output(path).unwrap();
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("id", ArrowDataType::Int32, false),
+            ArrowField::new("name", ArrowDataType::Utf8, true),
+        ]));
+
+        let mut writer: Box<dyn FormatFileWriter> = Box::new(
+            VortexFormatWriter::new(&output, schema.clone())
+                .await
+                .unwrap(),
+        );
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2])),
+                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])),
+            ],
+        )
+        .unwrap();
+        writer.write(&batch).await.unwrap();
+        writer.close().await.unwrap();
+
+        let input = file_io.new_input(path).unwrap();
+        let file_reader = input.reader().await.unwrap();
+        let metadata = input.metadata().await.unwrap();
+        let read_fields = vec![
+            DataField::new(
+                0,
+                "id".to_string(),
+                DataType::Int(crate::spec::IntType::new()),
+            ),
+            DataField::new(
+                1,
+                "name".to_string(),
+                DataType::VarChar(VarCharType::string_type()),
+            ),
+        ];
+
+        let reader = VortexFormatReader;
+        let mut stream = reader
+            .read_batch_stream(
+                Box::new(file_reader),
+                metadata.size,
+                &read_fields,
+                None,
+                None,
+                None,
+            )
+            .await
+            .unwrap();
+
+        let mut names = Vec::new();
+        while let Some(result) = stream.next().await {
+            let batch = result.unwrap();
+            assert_eq!(batch.schema().field(1).data_type(), 
&ArrowDataType::Utf8);
+            assert_eq!(batch.column(1).data_type(), &ArrowDataType::Utf8);
+            let name_col = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap();
+            for i in 0..batch.num_rows() {
+                names.push(name_col.value(i).to_string());
+            }
+        }
+        assert_eq!(names, vec!["Alice".to_string(), "Bob".to_string()]);
+    }
+
     #[tokio::test]
     async fn test_vortex_writer_multiple_batches() {
         let file_io = FileIOBuilder::new("memory").build().unwrap();

Reply via email to