This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d01002c332 [sqllogictest] port tests in avro.rs to sqllogictest (#6362)
d01002c332 is described below
commit d01002c33228a305fdb69d3e6a51465d0566598e
Author: elijah <[email protected]>
AuthorDate: Fri May 19 17:35:00 2023 +0800
[sqllogictest] port tests in avro.rs to sqllogictest (#6362)
* feat: port tests in avro.rs to sqllogictest
* fix: add test setup for avro_query_multiple_files
* run cargo fmt
* Run hash collisions test with avro support too
* fix clippy
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.github/workflows/rust.yml | 2 +-
datafusion/core/tests/sql/avro.rs | 157 ---------------------
datafusion/core/tests/sql/mod.rs | 1 -
datafusion/core/tests/sqllogictests/src/main.rs | 57 +++++++-
datafusion/core/tests/sqllogictests/src/setup.rs | 36 ++++-
.../core/tests/sqllogictests/test_files/avro.slt | 97 +++++++++++++
6 files changed, 183 insertions(+), 167 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 654c0d5649..92c397461a 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -461,7 +461,7 @@ jobs:
- name: Run tests
run: |
cd datafusion
- cargo test --lib --tests --features=force_hash_collisions
+ cargo test --lib --tests --features=force_hash_collisions,avro
cargo-toml-formatting-checks:
name: check Cargo.toml formatting
diff --git a/datafusion/core/tests/sql/avro.rs
b/datafusion/core/tests/sql/avro.rs
deleted file mode 100644
index 85ed30044c..0000000000
--- a/datafusion/core/tests/sql/avro.rs
+++ /dev/null
@@ -1,157 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-
-async fn register_alltypes_avro(ctx: &SessionContext) {
- let testdata = datafusion::test_util::arrow_test_data();
- ctx.register_avro(
- "alltypes_plain",
- &format!("{testdata}/avro/alltypes_plain.avro"),
- AvroReadOptions::default(),
- )
- .await
- .unwrap();
-}
-
-#[tokio::test]
-async fn avro_query() {
- let ctx = SessionContext::new();
- register_alltypes_avro(&ctx).await;
- // NOTE that string_col is actually a binary column and does not have the
UTF8 logical type
- // so we need an explicit cast
- let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+----+---------------------------+",
- "| id | alltypes_plain.string_col |",
- "+----+---------------------------+",
- "| 4 | 0 |",
- "| 5 | 1 |",
- "| 6 | 0 |",
- "| 7 | 1 |",
- "| 2 | 0 |",
- "| 3 | 1 |",
- "| 0 | 0 |",
- "| 1 | 1 |",
- "+----+---------------------------+",
- ];
-
- assert_batches_eq!(expected, &actual);
-}
-
-#[tokio::test]
-async fn avro_query_multiple_files() {
- let tempdir = tempfile::tempdir().unwrap();
- let table_path = tempdir.path();
- let testdata = datafusion::test_util::arrow_test_data();
- let alltypes_plain_file = format!("{testdata}/avro/alltypes_plain.avro");
- std::fs::copy(
- &alltypes_plain_file,
- format!("{}/alltypes_plain1.avro", table_path.display()),
- )
- .unwrap();
- std::fs::copy(
- &alltypes_plain_file,
- format!("{}/alltypes_plain2.avro", table_path.display()),
- )
- .unwrap();
-
- let ctx = SessionContext::new();
- ctx.register_avro(
- "alltypes_plain",
- table_path.display().to_string().as_str(),
- AvroReadOptions::default(),
- )
- .await
- .unwrap();
- // NOTE that string_col is actually a binary column and does not have the
UTF8 logical type
- // so we need an explicit cast
- let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+----+---------------------------+",
- "| id | alltypes_plain.string_col |",
- "+----+---------------------------+",
- "| 4 | 0 |",
- "| 5 | 1 |",
- "| 6 | 0 |",
- "| 7 | 1 |",
- "| 2 | 0 |",
- "| 3 | 1 |",
- "| 0 | 0 |",
- "| 1 | 1 |",
- "| 4 | 0 |",
- "| 5 | 1 |",
- "| 6 | 0 |",
- "| 7 | 1 |",
- "| 2 | 0 |",
- "| 3 | 1 |",
- "| 0 | 0 |",
- "| 1 | 1 |",
- "+----+---------------------------+",
- ];
-
- assert_batches_eq!(expected, &actual);
-}
-
-#[tokio::test]
-async fn avro_single_nan_schema() {
- let ctx = SessionContext::new();
- let testdata = datafusion::test_util::arrow_test_data();
- ctx.register_avro(
- "single_nan",
- &format!("{testdata}/avro/single_nan.avro"),
- AvroReadOptions::default(),
- )
- .await
- .unwrap();
- let sql = "SELECT mycol FROM single_nan";
- let dataframe = ctx.sql(sql).await.unwrap();
- let results = dataframe.collect().await.unwrap();
- for batch in results {
- assert_eq!(1, batch.num_rows());
- assert_eq!(1, batch.num_columns());
- }
-}
-
-#[tokio::test]
-async fn avro_explain() {
- let ctx = SessionContext::new();
- register_alltypes_avro(&ctx).await;
-
- let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
- let actual = execute(&ctx, sql).await;
- let actual = normalize_vec_for_explain(actual);
- let expected = vec![
- vec![
- "logical_plan",
- "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
- \n TableScan: alltypes_plain projection=[id]",
- ],
- vec![
- "physical_plan",
- "AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
- \n CoalescePartitionsExec\
- \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
- \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES),
input_partitions=1\
- \n AvroExec: file_groups={1 group:
[[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, projection=[id]\
- \n",
- ],
- ];
- assert_eq!(expected, actual);
-}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 3c5845fe22..af79a10104 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -81,7 +81,6 @@ macro_rules! test_expression {
pub mod aggregates;
pub mod arrow_files;
#[cfg(feature = "avro")]
-pub mod avro;
pub mod create_drop;
pub mod explain_analyze;
pub mod expr;
diff --git a/datafusion/core/tests/sqllogictests/src/main.rs
b/datafusion/core/tests/sqllogictests/src/main.rs
index 841511decf..df43544867 100644
--- a/datafusion/core/tests/sqllogictests/src/main.rs
+++ b/datafusion/core/tests/sqllogictests/src/main.rs
@@ -22,6 +22,7 @@ use std::thread;
use log::info;
use sqllogictest::strict_column_validator;
+use tempfile::TempDir;
use datafusion::prelude::{SessionConfig, SessionContext};
@@ -83,7 +84,8 @@ async fn run_test_file(
relative_path: PathBuf,
) -> Result<(), Box<dyn Error>> {
info!("Running with DataFusion runner: {}", path.display());
- let ctx = context_for_test_file(&relative_path).await;
+ let test_ctx = context_for_test_file(&relative_path).await;
+ let ctx = test_ctx.session_ctx().clone();
let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx,
relative_path));
runner.with_column_validator(strict_column_validator);
runner.run_file_async(path).await?;
@@ -110,7 +112,8 @@ async fn run_complete_file(
info!("Using complete mode to complete: {}", path.display());
- let ctx = context_for_test_file(&relative_path).await;
+ let test_ctx = context_for_test_file(&relative_path).await;
+ let ctx = test_ctx.session_ctx().clone();
let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx,
relative_path));
let col_separator = " ";
runner
@@ -160,27 +163,67 @@ fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Box<dyn
Iterator<Item = PathBu
}
/// Create a SessionContext, configured for the specific test
-async fn context_for_test_file(relative_path: &Path) -> SessionContext {
+async fn context_for_test_file(relative_path: &Path) -> TestContext {
let config = SessionConfig::new()
// hardcode target partitions so plans are deterministic
.with_target_partitions(4);
- let ctx = SessionContext::with_config(config);
+ let mut test_ctx = TestContext::new(SessionContext::with_config(config));
match relative_path.file_name().unwrap().to_str().unwrap() {
"aggregate.slt" => {
info!("Registering aggregate tables");
- setup::register_aggregate_tables(&ctx).await;
+ setup::register_aggregate_tables(test_ctx.session_ctx()).await;
}
"scalar.slt" => {
info!("Registering scalar tables");
- setup::register_scalar_tables(&ctx).await;
+ setup::register_scalar_tables(test_ctx.session_ctx()).await;
+ }
+ "avro.slt" => {
+ info!("Registering avro tables");
+ setup::register_avro_tables(&mut test_ctx).await;
}
_ => {
info!("Using default SessionContext");
}
};
- ctx
+ test_ctx
+}
+
+/// Context for running tests
+pub struct TestContext {
+ /// Context for running queries
+ ctx: SessionContext,
+ /// Temporary directory created and cleared at the end of the test
+ test_dir: Option<TempDir>,
+}
+
+impl TestContext {
+ fn new(ctx: SessionContext) -> Self {
+ Self {
+ ctx,
+ test_dir: None,
+ }
+ }
+
+ /// Enables the test directory feature. If not enabled,
+ /// calling `testdir_path` will result in a panic.
+ fn enable_testdir(&mut self) {
+ if self.test_dir.is_none() {
+ self.test_dir = Some(TempDir::new().expect("failed to create
testdir"));
+ }
+ }
+
+ /// Returns the path to the test directory. Panics if the test
+ /// directory feature is not enabled via `enable_testdir`.
+ fn testdir_path(&self) -> &Path {
+ self.test_dir.as_ref().expect("testdir not enabled").path()
+ }
+
+ /// Returns a reference to the internal SessionContext
+ fn session_ctx(&self) -> &SessionContext {
+ &self.ctx
+ }
}
/// Parsed command line options
diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs
b/datafusion/core/tests/sqllogictests/src/setup.rs
index 9e3f154f59..91ce03c5a0 100644
--- a/datafusion/core/tests/sqllogictests/src/setup.rs
+++ b/datafusion/core/tests/sqllogictests/src/setup.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::prelude::AvroReadOptions;
use datafusion::{
arrow::{
array::{
@@ -30,7 +31,40 @@ use datafusion::{
};
use std::sync::Arc;
-use crate::utils;
+use crate::{utils, TestContext};
+
+pub async fn register_avro_tables(ctx: &mut TestContext) {
+ register_avro_test_data(ctx).await;
+}
+
+async fn register_avro_test_data(ctx: &mut TestContext) {
+ ctx.enable_testdir();
+
+ let table_path = ctx.testdir_path().join("avro");
+ std::fs::create_dir(&table_path).expect("failed to create avro table
path");
+
+ let testdata = datafusion::test_util::arrow_test_data();
+ let alltypes_plain_file = format!("{testdata}/avro/alltypes_plain.avro");
+ std::fs::copy(
+ &alltypes_plain_file,
+ format!("{}/alltypes_plain1.avro", table_path.display()),
+ )
+ .unwrap();
+ std::fs::copy(
+ &alltypes_plain_file,
+ format!("{}/alltypes_plain2.avro", table_path.display()),
+ )
+ .unwrap();
+
+ ctx.session_ctx()
+ .register_avro(
+ "alltypes_plain_multi_files",
+ table_path.display().to_string().as_str(),
+ AvroReadOptions::default(),
+ )
+ .await
+ .unwrap();
+}
pub async fn register_aggregate_tables(ctx: &SessionContext) {
register_aggregate_test_100(ctx).await;
diff --git a/datafusion/core/tests/sqllogictests/test_files/avro.slt
b/datafusion/core/tests/sqllogictests/test_files/avro.slt
new file mode 100644
index 0000000000..5a01ae72cb
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/avro.slt
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+statement ok
+CREATE EXTERNAL TABLE alltypes_plain (
+ id INT NOT NULL,
+ bool_col BOOLEAN NOT NULL,
+ tinyint_col TINYINT NOT NULL,
+ smallint_col SMALLINT NOT NULL,
+ int_col INT NOT NULL,
+ bigint_col BIGINT NOT NULL,
+ float_col FLOAT NOT NULL,
+ double_col DOUBLE NOT NULL,
+ date_string_col BYTEA NOT NULL,
+ string_col VARCHAR NOT NULL,
+ timestamp_col TIMESTAMP NOT NULL,
+)
+STORED AS AVRO
+WITH HEADER ROW
+LOCATION '../../testing/data/avro/alltypes_plain.avro'
+
+statement ok
+CREATE EXTERNAL TABLE single_nan (
+ mycol FLOAT
+)
+STORED AS AVRO
+WITH HEADER ROW
+LOCATION '../../testing/data/avro/single_nan.avro'
+
+# test avro query
+query IT
+SELECT id, CAST(string_col AS varchar) FROM alltypes_plain
+----
+4 0
+5 1
+6 0
+7 1
+2 0
+3 1
+0 0
+1 1
+
+# test avro single nan schema
+query R
+SELECT mycol FROM single_nan
+----
+NULL
+
+# test avro query multi files
+query IT
+SELECT id, CAST(string_col AS varchar) FROM alltypes_plain_multi_files
+----
+4 0
+5 1
+6 0
+7 1
+2 0
+3 1
+0 0
+1 1
+4 0
+5 1
+6 0
+7 1
+2 0
+3 1
+0 0
+1 1
+
+# test avro explain
+query TT
+EXPLAIN SELECT count(*) from alltypes_plain
+----
+logical_plan
+Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+--TableScan: alltypes_plain projection=[id]
+physical_plan
+AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------AvroExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[id]