This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 91eec3f925 Port tests in projection.rs to sqllogictest (#8240)
91eec3f925 is described below
commit 91eec3f92567f979e6b104793669dfe5bf35390b
Author: Chojan Shang <[email protected]>
AuthorDate: Sat Nov 18 03:38:06 2023 +0800
Port tests in projection.rs to sqllogictest (#8240)
* Port tests in projection.rs to sqllogictest
Signed-off-by: Chojan Shang <[email protected]>
* Minor update
Signed-off-by: Chojan Shang <[email protected]>
* Make test happy
Signed-off-by: Chojan Shang <[email protected]>
* Minor update
Signed-off-by: Chojan Shang <[email protected]>
* Refine tests
Signed-off-by: Chojan Shang <[email protected]>
* chore: remove unused code
Signed-off-by: Chojan Shang <[email protected]>
---------
Signed-off-by: Chojan Shang <[email protected]>
---
datafusion/core/tests/sql/mod.rs | 18 --
datafusion/core/tests/sql/partitioned_csv.rs | 20 +-
datafusion/core/tests/sql/projection.rs | 373 ----------------------
datafusion/sqllogictest/test_files/projection.slt | 235 ++++++++++++++
4 files changed, 236 insertions(+), 410 deletions(-)
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 1d58bba876..b04ba573af 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -86,7 +86,6 @@ pub mod parquet;
pub mod parquet_schema;
pub mod partitioned_csv;
pub mod predicates;
-pub mod projection;
pub mod references;
pub mod repartition;
pub mod select;
@@ -455,23 +454,6 @@ async fn register_aggregate_csv_by_sql(ctx:
&SessionContext) {
);
}
-async fn register_aggregate_simple_csv(ctx: &SessionContext) -> Result<()> {
- // It's not possible to use aggregate_test_100 as it doesn't have enough
similar values to test grouping on floats.
- let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Float32, false),
- Field::new("c2", DataType::Float64, false),
- Field::new("c3", DataType::Boolean, false),
- ]));
-
- ctx.register_csv(
- "aggregate_simple",
- "tests/data/aggregate_simple.csv",
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
- Ok(())
-}
-
async fn register_aggregate_csv(ctx: &SessionContext) -> Result<()> {
let testdata = datafusion::test_util::arrow_test_data();
let schema = test_util::aggr_test_schema();
diff --git a/datafusion/core/tests/sql/partitioned_csv.rs
b/datafusion/core/tests/sql/partitioned_csv.rs
index d5a1c2f0b4..b77557a66c 100644
--- a/datafusion/core/tests/sql/partitioned_csv.rs
+++ b/datafusion/core/tests/sql/partitioned_csv.rs
@@ -19,31 +19,13 @@
use std::{io::Write, sync::Arc};
-use arrow::{
- datatypes::{DataType, Field, Schema, SchemaRef},
- record_batch::RecordBatch,
-};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
error::Result,
prelude::{CsvReadOptions, SessionConfig, SessionContext},
};
use tempfile::TempDir;
-/// Execute SQL and return results
-async fn plan_and_collect(
- ctx: &mut SessionContext,
- sql: &str,
-) -> Result<Vec<RecordBatch>> {
- ctx.sql(sql).await?.collect().await
-}
-
-/// Execute SQL and return results
-pub async fn execute(sql: &str, partition_count: usize) ->
Result<Vec<RecordBatch>> {
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
- plan_and_collect(&mut ctx, sql).await
-}
-
/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
diff --git a/datafusion/core/tests/sql/projection.rs
b/datafusion/core/tests/sql/projection.rs
deleted file mode 100644
index b31cb34f52..0000000000
--- a/datafusion/core/tests/sql/projection.rs
+++ /dev/null
@@ -1,373 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use datafusion::datasource::provider_as_source;
-use datafusion::test_util::scan_empty;
-use datafusion_expr::{when, LogicalPlanBuilder, UNNAMED_TABLE};
-use tempfile::TempDir;
-
-use super::*;
-
-#[tokio::test]
-async fn projection_same_fields() -> Result<()> {
- let ctx = SessionContext::new();
-
- let sql = "select (1+1) as a from (select 1 as a) as b;";
- let actual = execute_to_batches(&ctx, sql).await;
-
- #[rustfmt::skip]
- let expected = ["+---+",
- "| a |",
- "+---+",
- "| 2 |",
- "+---+"];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn projection_type_alias() -> Result<()> {
- let ctx = SessionContext::new();
- register_aggregate_simple_csv(&ctx).await?;
-
- // Query that aliases one column to the name of a different column
- // that also has a different type (c1 == float32, c3 == boolean)
- let sql = "SELECT c1 as c3 FROM aggregate_simple ORDER BY c3 LIMIT 2";
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = [
- "+---------+",
- "| c3 |",
- "+---------+",
- "| 0.00001 |",
- "| 0.00002 |",
- "+---------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn csv_query_group_by_avg_with_projection() -> Result<()> {
- let ctx = SessionContext::new();
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT avg(c12), c1 FROM aggregate_test_100 GROUP BY c1";
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = [
- "+-----------------------------+----+",
- "| AVG(aggregate_test_100.c12) | c1 |",
- "+-----------------------------+----+",
- "| 0.41040709263815384 | b |",
- "| 0.48600669271341534 | e |",
- "| 0.48754517466109415 | a |",
- "| 0.48855379387549824 | d |",
- "| 0.6600456536439784 | c |",
- "+-----------------------------+----+",
- ];
- assert_batches_sorted_eq!(expected, &actual);
- Ok(())
-}
-
-#[tokio::test]
-async fn parallel_projection() -> Result<()> {
- let partition_count = 4;
- let results =
- partitioned_csv::execute("SELECT c1, c2 FROM test",
partition_count).await?;
-
- let expected = vec![
- "+----+----+",
- "| c1 | c2 |",
- "+----+----+",
- "| 3 | 1 |",
- "| 3 | 2 |",
- "| 3 | 3 |",
- "| 3 | 4 |",
- "| 3 | 5 |",
- "| 3 | 6 |",
- "| 3 | 7 |",
- "| 3 | 8 |",
- "| 3 | 9 |",
- "| 3 | 10 |",
- "| 2 | 1 |",
- "| 2 | 2 |",
- "| 2 | 3 |",
- "| 2 | 4 |",
- "| 2 | 5 |",
- "| 2 | 6 |",
- "| 2 | 7 |",
- "| 2 | 8 |",
- "| 2 | 9 |",
- "| 2 | 10 |",
- "| 1 | 1 |",
- "| 1 | 2 |",
- "| 1 | 3 |",
- "| 1 | 4 |",
- "| 1 | 5 |",
- "| 1 | 6 |",
- "| 1 | 7 |",
- "| 1 | 8 |",
- "| 1 | 9 |",
- "| 1 | 10 |",
- "| 0 | 1 |",
- "| 0 | 2 |",
- "| 0 | 3 |",
- "| 0 | 4 |",
- "| 0 | 5 |",
- "| 0 | 6 |",
- "| 0 | 7 |",
- "| 0 | 8 |",
- "| 0 | 9 |",
- "| 0 | 10 |",
- "+----+----+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn subquery_alias_case_insensitive() -> Result<()> {
- let partition_count = 1;
- let results =
- partitioned_csv::execute("SELECT V1.c1, v1.C2 FROM (SELECT test.C1,
TEST.c2 FROM test) V1 ORDER BY v1.c1, V1.C2 LIMIT 1", partition_count).await?;
-
- let expected = [
- "+----+----+",
- "| c1 | c2 |",
- "+----+----+",
- "| 0 | 1 |",
- "+----+----+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn projection_on_table_scan() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let partition_count = 4;
- let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
-
- let table = ctx.table("test").await?;
- let logical_plan = LogicalPlanBuilder::from(table.into_optimized_plan()?)
- .project(vec![col("c2")])?
- .build()?;
-
- let state = ctx.state();
- let optimized_plan = state.optimize(&logical_plan)?;
- match &optimized_plan {
- LogicalPlan::TableScan(TableScan {
- source,
- projected_schema,
- ..
- }) => {
- assert_eq!(source.schema().fields().len(), 3);
- assert_eq!(projected_schema.fields().len(), 1);
- }
- _ => panic!("input to projection should be TableScan"),
- }
-
- let expected = "TableScan: test projection=[c2]";
- assert_eq!(format!("{optimized_plan:?}"), expected);
-
- let physical_plan = state.create_physical_plan(&optimized_plan).await?;
-
- assert_eq!(1, physical_plan.schema().fields().len());
- assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
- let batches = collect(physical_plan, state.task_ctx()).await?;
- assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());
-
- Ok(())
-}
-
-#[tokio::test]
-async fn preserve_nullability_on_projection() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let ctx = partitioned_csv::create_ctx(&tmp_dir, 1).await?;
-
- let schema: Schema =
ctx.table("test").await.unwrap().schema().clone().into();
- assert!(!schema.field_with_name("c1")?.is_nullable());
-
- let plan = scan_empty(None, &schema, None)?
- .project(vec![col("c1")])?
- .build()?;
-
- let dataframe = DataFrame::new(ctx.state(), plan);
- let physical_plan = dataframe.create_physical_plan().await?;
- assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
- Ok(())
-}
-
-#[tokio::test]
-async fn project_cast_dictionary() {
- let ctx = SessionContext::new();
-
- let host: DictionaryArray<Int32Type> = vec![Some("host1"), None,
Some("host2")]
- .into_iter()
- .collect();
-
- let batch = RecordBatch::try_from_iter(vec![("host", Arc::new(host) as
_)]).unwrap();
-
- let t = MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap();
-
- // Note that `host` is a dictionary array but `lit("")` is a
DataType::Utf8 that needs to be cast
- let expr = when(col("host").is_null(), lit(""))
- .otherwise(col("host"))
- .unwrap();
-
- let projection = None;
- let builder = LogicalPlanBuilder::scan(
- "cpu_load_short",
- provider_as_source(Arc::new(t)),
- projection,
- )
- .unwrap();
-
- let logical_plan = builder.project(vec![expr]).unwrap().build().unwrap();
- let df = DataFrame::new(ctx.state(), logical_plan);
- let actual = df.collect().await.unwrap();
-
- let expected =
["+----------------------------------------------------------------------------------+",
- "| CASE WHEN cpu_load_short.host IS NULL THEN Utf8(\"\") ELSE
cpu_load_short.host END |",
-
"+----------------------------------------------------------------------------------+",
- "| host1
|",
- "|
|",
- "| host2
|",
-
"+----------------------------------------------------------------------------------+"];
- assert_batches_eq!(expected, &actual);
-}
-
-#[tokio::test]
-async fn projection_on_memory_scan() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("a", DataType::Int32, false),
- Field::new("b", DataType::Int32, false),
- Field::new("c", DataType::Int32, false),
- ]);
- let schema = SchemaRef::new(schema);
-
- let partitions = vec![vec![RecordBatch::try_new(
- schema.clone(),
- vec![
- Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
- Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
- Arc::new(Int32Array::from(vec![3, 12, 12, 120])),
- ],
- )?]];
-
- let provider = Arc::new(MemTable::try_new(schema, partitions)?);
- let plan =
- LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider),
None)?
- .project(vec![col("b")])?
- .build()?;
- assert_fields_eq(&plan, vec!["b"]);
-
- let ctx = SessionContext::new();
- let state = ctx.state();
- let optimized_plan = state.optimize(&plan)?;
- match &optimized_plan {
- LogicalPlan::TableScan(TableScan {
- source,
- projected_schema,
- ..
- }) => {
- assert_eq!(source.schema().fields().len(), 3);
- assert_eq!(projected_schema.fields().len(), 1);
- }
- _ => panic!("input to projection should be InMemoryScan"),
- }
-
- let expected = format!("TableScan: {UNNAMED_TABLE} projection=[b]");
- assert_eq!(format!("{optimized_plan:?}"), expected);
-
- let physical_plan = state.create_physical_plan(&optimized_plan).await?;
-
- assert_eq!(1, physical_plan.schema().fields().len());
- assert_eq!("b", physical_plan.schema().field(0).name().as_str());
-
- let batches = collect(physical_plan, state.task_ctx()).await?;
- assert_eq!(1, batches.len());
- assert_eq!(1, batches[0].num_columns());
- assert_eq!(4, batches[0].num_rows());
-
- Ok(())
-}
-
-fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
- let actual: Vec<String> = plan
- .schema()
- .fields()
- .iter()
- .map(|f| f.name().clone())
- .collect();
- assert_eq!(actual, expected);
-}
-
-#[tokio::test]
-async fn project_column_with_same_name_as_relation() -> Result<()> {
- let ctx = SessionContext::new();
-
- let sql = "select a.a from (select 1 as a) as a;";
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"];
- assert_batches_sorted_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn project_column_with_filters_that_cant_pushed_down_always_false() ->
Result<()> {
- let ctx = SessionContext::new();
-
- let sql = "select * from (select 1 as a) f where f.a=2;";
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = ["++", "++"];
- assert_batches_sorted_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn project_column_with_filters_that_cant_pushed_down_always_true() ->
Result<()> {
- let ctx = SessionContext::new();
-
- let sql = "select * from (select 1 as a) f where f.a=1;";
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"];
- assert_batches_sorted_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn project_columns_in_memory_without_propagation() -> Result<()> {
- let ctx = SessionContext::new();
-
- let sql = "select column1 as a from (values (1), (2)) f where f.column1 =
2;";
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = ["+---+", "| a |", "+---+", "| 2 |", "+---+"];
- assert_batches_sorted_eq!(expected, &actual);
-
- Ok(())
-}
diff --git a/datafusion/sqllogictest/test_files/projection.slt
b/datafusion/sqllogictest/test_files/projection.slt
new file mode 100644
index 0000000000..b752f5644b
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/projection.slt
@@ -0,0 +1,235 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## Projection Statement Tests
+##########
+
+# prepare data
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_simple (
+ c1 FLOAT NOT NULL,
+ c2 DOUBLE NOT NULL,
+ c3 BOOLEAN NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../core/tests/data/aggregate_simple.csv'
+
+statement ok
+CREATE TABLE memory_table(a INT NOT NULL, b INT NOT NULL, c INT NOT NULL) AS
VALUES
+(1, 2, 3),
+(10, 12, 12),
+(10, 12, 12),
+(100, 120, 120);
+
+statement ok
+CREATE TABLE cpu_load_short(host STRING NOT NULL) AS VALUES
+('host1'),
+('host2');
+
+statement ok
+CREATE EXTERNAL TABLE test (c1 int, c2 bigint, c3 boolean)
+STORED AS CSV LOCATION '../core/tests/data/partitioned_csv';
+
+statement ok
+CREATE EXTERNAL TABLE test_simple (c1 int, c2 bigint, c3 boolean)
+STORED AS CSV LOCATION '../core/tests/data/partitioned_csv/partition-0.csv';
+
+# projection same fields
+query I rowsort
+select (1+1) as a from (select 1 as a) as b;
+----
+2
+
+# projection type alias
+query R rowsort
+SELECT c1 as c3 FROM aggregate_simple ORDER BY c3 LIMIT 2;
+----
+0.00001
+0.00002
+
+# csv query group by avg with projection
+query RT rowsort
+SELECT avg(c12), c1 FROM aggregate_test_100 GROUP BY c1;
+----
+0.410407092638 b
+0.486006692713 e
+0.487545174661 a
+0.488553793875 d
+0.660045653644 c
+
+# parallel projection
+query II
+SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC
+----
+3 0
+3 1
+3 2
+3 3
+3 4
+3 5
+3 6
+3 7
+3 8
+3 9
+3 10
+2 0
+2 1
+2 2
+2 3
+2 4
+2 5
+2 6
+2 7
+2 8
+2 9
+2 10
+1 0
+1 1
+1 2
+1 3
+1 4
+1 5
+1 6
+1 7
+1 8
+1 9
+1 10
+0 0
+0 1
+0 2
+0 3
+0 4
+0 5
+0 6
+0 7
+0 8
+0 9
+0 10
+
+# subquery alias case insensitive
+query II
+SELECT V1.c1, v1.C2 FROM (SELECT test_simple.C1, TEST_SIMPLE.c2 FROM
test_simple) V1 ORDER BY v1.c1, V1.C2 LIMIT 1;
+----
+0 0
+
+# projection on table scan
+statement ok
+set datafusion.explain.logical_plan_only = true
+
+query TT
+EXPLAIN SELECT c2 FROM test;
+----
+logical_plan TableScan: test projection=[c2]
+
+statement count 44
+select c2 from test;
+
+statement ok
+set datafusion.explain.logical_plan_only = false
+
+# project cast dictionary
+query T
+SELECT
+ CASE
+ WHEN cpu_load_short.host IS NULL THEN ''
+ ELSE cpu_load_short.host
+ END AS host
+FROM
+ cpu_load_short;
+----
+host1
+host2
+
+# projection on memory scan
+query TT
+explain select b from memory_table;
+----
+logical_plan TableScan: memory_table projection=[b]
+physical_plan MemoryExec: partitions=1, partition_sizes=[1]
+
+query I
+select b from memory_table;
+----
+2
+12
+12
+120
+
+# project column with same name as relation
+query I
+select a.a from (select 1 as a) as a;
+----
+1
+
+# project column with filters that cant pushed down always false
+query I
+select * from (select 1 as a) f where f.a=2;
+----
+
+
+# project column with filters that cant pushed down always true
+query I
+select * from (select 1 as a) f where f.a=1;
+----
+1
+
+# project columns in memory without propagation
+query I
+SELECT column1 as a from (values (1), (2)) f where f.column1 = 2;
+----
+2
+
+# clean data
+statement ok
+DROP TABLE aggregate_simple;
+
+statement ok
+DROP TABLE aggregate_test_100;
+
+statement ok
+DROP TABLE memory_table;
+
+statement ok
+DROP TABLE cpu_load_short;
+
+statement ok
+DROP TABLE test;
+
+statement ok
+DROP TABLE test_simple;