This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9619f79 test(datafusion): add vortex SQL e2e (#321)
9619f79 is described below
commit 9619f79752fd7ea5bb228302e1f2ee7542234fff
Author: Jiwen liu <[email protected]>
AuthorDate: Tue May 19 15:06:17 2026 +0800
test(datafusion): add vortex SQL e2e (#321)
---
.github/workflows/ci.yml | 6 ++
crates/integrations/datafusion/Cargo.toml | 1 +
crates/integrations/datafusion/tests/common/mod.rs | 18 ++++-
.../integrations/datafusion/tests/vortex_tables.rs | 80 +++++++++++++++++++
crates/paimon/src/arrow/format/vortex.rs | 90 +++++++++++++++++++++-
5 files changed, 190 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 990a99c..3a807d8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -126,6 +126,12 @@ jobs:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
+ - name: DataFusion Vortex Integration Test
+ run: cargo test -p paimon-datafusion --features vortex --test
vortex_tables
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
+
- name: Install uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78
with:
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 30b5100..3e7392d 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -29,6 +29,7 @@ keywords = ["paimon", "datafusion", "integrations"]
[features]
fulltext = ["paimon/fulltext"]
+vortex = ["paimon/vortex"]
[dependencies]
async-trait = "0.1"
diff --git a/crates/integrations/datafusion/tests/common/mod.rs
b/crates/integrations/datafusion/tests/common/mod.rs
index d32734c..bcd0cd7 100644
--- a/crates/integrations/datafusion/tests/common/mod.rs
+++ b/crates/integrations/datafusion/tests/common/mod.rs
@@ -54,9 +54,24 @@ pub async fn setup_sql_context() -> (TempDir, SQLContext) {
#[allow(dead_code)]
pub async fn collect_id_name(sql_context: &SQLContext, sql: &str) -> Vec<(i32,
String)> {
+ let mut rows = collect_id_name_in_batch_order(sql_context, sql).await;
+ rows.sort_by_key(|(id, _)| *id);
+ rows
+}
+
+#[allow(dead_code)]
+pub async fn collect_id_name_in_batch_order(
+ sql_context: &SQLContext,
+ sql: &str,
+) -> Vec<(i32, String)> {
let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap();
+ collect_id_name_from_batches_in_order(&batches)
+}
+
+#[allow(dead_code)]
+pub fn collect_id_name_from_batches_in_order(batches: &[RecordBatch]) ->
Vec<(i32, String)> {
let mut rows = Vec::new();
- for batch in &batches {
+ for batch in batches {
let ids = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
@@ -69,7 +84,6 @@ pub async fn collect_id_name(sql_context: &SQLContext, sql:
&str) -> Vec<(i32, S
rows.push((ids.value(i), names.value(i).to_string()));
}
}
- rows.sort_by_key(|(id, _)| *id);
rows
}
diff --git a/crates/integrations/datafusion/tests/vortex_tables.rs
b/crates/integrations/datafusion/tests/vortex_tables.rs
new file mode 100644
index 0000000..9492429
--- /dev/null
+++ b/crates/integrations/datafusion/tests/vortex_tables.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![cfg(feature = "vortex")]
+
+//! Vortex file format SQL end-to-end tests.
+
+mod common;
+
+use std::path::Path;
+
+#[tokio::test]
+async fn test_vortex_file_format_sql_e2e() {
+ let (tmp, sql_context) = common::setup_sql_context().await;
+
+ common::exec(
+ &sql_context,
+ "CREATE TABLE paimon.test_db.t (
+ id INT,
+ name STRING
+ ) WITH (
+ 'file.format' = 'vortex'
+ )",
+ )
+ .await;
+
+ common::exec(
+ &sql_context,
+ "INSERT INTO paimon.test_db.t VALUES (1, 'Alice'), (2, 'Bob')",
+ )
+ .await;
+
+ assert!(
+ contains_vortex_file(tmp.path()),
+ "expected Vortex data file"
+ );
+
+ let rows = common::collect_id_name_in_batch_order(
+ &sql_context,
+ "SELECT id, name FROM paimon.test_db.t ORDER BY id",
+ )
+ .await;
+ assert_eq!(rows, vec![(1, "Alice".to_string()), (2, "Bob".to_string())]);
+
+ let filtered = common::collect_id_name_in_batch_order(
+ &sql_context,
+ "SELECT id, name FROM paimon.test_db.t WHERE id = 2",
+ )
+ .await;
+ assert_eq!(filtered, vec![(2, "Bob".to_string())]);
+}
+
+fn contains_vortex_file(path: &Path) -> bool {
+ let entries = std::fs::read_dir(path).expect("read warehouse dir");
+ for entry in entries {
+ let path = entry.expect("read dir entry").path();
+ if path.is_dir() {
+ if contains_vortex_file(&path) {
+ return true;
+ }
+ } else if path.extension().is_some_and(|ext| ext == "vortex") {
+ return true;
+ }
+ }
+ false
+}
diff --git a/crates/paimon/src/arrow/format/vortex.rs
b/crates/paimon/src/arrow/format/vortex.rs
index 3b7ba27..f0a3a48 100644
--- a/crates/paimon/src/arrow/format/vortex.rs
+++ b/crates/paimon/src/arrow/format/vortex.rs
@@ -21,7 +21,7 @@ use crate::spec::{DataField, Datum, Predicate,
PredicateOperator};
use crate::table::{ArrowRecordBatchStream, RowRange};
use crate::Error;
use arrow_array::RecordBatch;
-use arrow_schema::SchemaRef;
+use arrow_schema::{DataType as ArrowDataType, SchemaRef};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::StreamExt;
@@ -472,7 +472,7 @@ fn vortex_array_to_record_batch(
schema: &SchemaRef,
) -> crate::Result<RecordBatch> {
let arrow_array = vortex_array
- .into_arrow_preferred()
+ .into_arrow(&ArrowDataType::Struct(schema.fields().clone()))
.map_err(|e| Error::DataInvalid {
message: format!("Failed to convert Vortex array to Arrow: {e}"),
source: None,
@@ -486,6 +486,17 @@ fn vortex_array_to_record_batch(
source: None,
})?;
+ if struct_array.columns().len() != schema.fields().len() {
+ return Err(Error::DataInvalid {
+ message: format!(
+ "Vortex column count {} does not match target schema column
count {}",
+ struct_array.columns().len(),
+ schema.fields().len()
+ ),
+ source: None,
+ });
+ }
+
RecordBatch::try_new(schema.clone(),
struct_array.columns().to_vec()).map_err(|e| {
Error::DataInvalid {
message: format!("Failed to build RecordBatch from Vortex data:
{e}"),
@@ -680,7 +691,8 @@ mod tests {
use super::*;
use crate::arrow::format::FormatFileWriter;
use crate::io::FileIOBuilder;
- use arrow_array::Int32Array;
+ use crate::spec::{DataField, DataType, VarCharType};
+ use arrow_array::{Int32Array, StringArray};
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema
as ArrowSchema};
fn test_arrow_schema() -> Arc<ArrowSchema> {
@@ -758,6 +770,78 @@ mod tests {
assert_eq!(total_rows, 3);
}
+ #[tokio::test]
+ async fn test_vortex_reader_returns_utf8_for_string_schema() {
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let path = "memory:/test_vortex_utf8_schema.vortex";
+ let output = file_io.new_output(path).unwrap();
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("name", ArrowDataType::Utf8, true),
+ ]));
+
+ let mut writer: Box<dyn FormatFileWriter> = Box::new(
+ VortexFormatWriter::new(&output, schema.clone())
+ .await
+ .unwrap(),
+ );
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2])),
+ Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])),
+ ],
+ )
+ .unwrap();
+ writer.write(&batch).await.unwrap();
+ writer.close().await.unwrap();
+
+ let input = file_io.new_input(path).unwrap();
+ let file_reader = input.reader().await.unwrap();
+ let metadata = input.metadata().await.unwrap();
+ let read_fields = vec![
+ DataField::new(
+ 0,
+ "id".to_string(),
+ DataType::Int(crate::spec::IntType::new()),
+ ),
+ DataField::new(
+ 1,
+ "name".to_string(),
+ DataType::VarChar(VarCharType::string_type()),
+ ),
+ ];
+
+ let reader = VortexFormatReader;
+ let mut stream = reader
+ .read_batch_stream(
+ Box::new(file_reader),
+ metadata.size,
+ &read_fields,
+ None,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+
+ let mut names = Vec::new();
+ while let Some(result) = stream.next().await {
+ let batch = result.unwrap();
+ assert_eq!(batch.schema().field(1).data_type(),
&ArrowDataType::Utf8);
+ assert_eq!(batch.column(1).data_type(), &ArrowDataType::Utf8);
+ let name_col = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ for i in 0..batch.num_rows() {
+ names.push(name_col.value(i).to_string());
+ }
+ }
+ assert_eq!(names, vec!["Alice".to_string(), "Bob".to_string()]);
+ }
+
#[tokio::test]
async fn test_vortex_writer_multiple_batches() {
let file_io = FileIOBuilder::new("memory").build().unwrap();