This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e52f844 Add partitioned_csv setup code to sql_integration test (#1743)
e52f844 is described below
commit e52f844427cc80fc6551851117266f5e2372ec6c
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Feb 5 06:10:35 2022 -0500
Add partitioned_csv setup code to sql_integration test (#1743)
---
datafusion/src/execution/context.rs | 239 +-------------------------------
datafusion/tests/sql/mod.rs | 1 +
datafusion/tests/sql/partitioned_csv.rs | 95 +++++++++++++
datafusion/tests/sql/projection.rs | 192 +++++++++++++++++++++++++
datafusion/tests/sql/select.rs | 59 +++++++-
5 files changed, 347 insertions(+), 239 deletions(-)
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index fb271a1..96e49c8 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1281,11 +1281,9 @@ mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
use crate::from_slice::FromSlice;
- use crate::logical_plan::plan::Projection;
- use crate::logical_plan::TableScan;
use crate::logical_plan::{binary_expr, lit, Operator};
+ use crate::physical_plan::collect;
use crate::physical_plan::functions::{make_scalar_function, Volatility};
- use crate::physical_plan::{collect, collect_partitioned};
use crate::test;
use crate::variable::VarType;
use crate::{
@@ -1311,7 +1309,6 @@ mod tests {
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use tempfile::TempDir;
- use test::*;
#[tokio::test]
async fn shared_memory_and_disk_manager() {
@@ -1348,62 +1345,6 @@ mod tests {
}
#[tokio::test]
- async fn parallel_projection() -> Result<()> {
- let partition_count = 4;
- let results = execute("SELECT c1, c2 FROM test",
partition_count).await?;
-
- let expected = vec![
- "+----+----+",
- "| c1 | c2 |",
- "+----+----+",
- "| 3 | 1 |",
- "| 3 | 2 |",
- "| 3 | 3 |",
- "| 3 | 4 |",
- "| 3 | 5 |",
- "| 3 | 6 |",
- "| 3 | 7 |",
- "| 3 | 8 |",
- "| 3 | 9 |",
- "| 3 | 10 |",
- "| 2 | 1 |",
- "| 2 | 2 |",
- "| 2 | 3 |",
- "| 2 | 4 |",
- "| 2 | 5 |",
- "| 2 | 6 |",
- "| 2 | 7 |",
- "| 2 | 8 |",
- "| 2 | 9 |",
- "| 2 | 10 |",
- "| 1 | 1 |",
- "| 1 | 2 |",
- "| 1 | 3 |",
- "| 1 | 4 |",
- "| 1 | 5 |",
- "| 1 | 6 |",
- "| 1 | 7 |",
- "| 1 | 8 |",
- "| 1 | 9 |",
- "| 1 | 10 |",
- "| 0 | 1 |",
- "| 0 | 2 |",
- "| 0 | 3 |",
- "| 0 | 4 |",
- "| 0 | 5 |",
- "| 0 | 6 |",
- "| 0 | 7 |",
- "| 0 | 8 |",
- "| 0 | 9 |",
- "| 0 | 10 |",
- "+----+----+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
async fn create_variable_expr() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
@@ -1448,184 +1389,6 @@ mod tests {
}
#[tokio::test]
- async fn parallel_query_with_filter() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let partition_count = 4;
- let ctx = create_ctx(&tmp_dir, partition_count).await?;
-
- let logical_plan =
- ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND
c1 < 3")?;
- let logical_plan = ctx.optimize(&logical_plan)?;
-
- let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
-
- let runtime = ctx.state.lock().runtime_env.clone();
- let results = collect_partitioned(physical_plan, runtime).await?;
-
- // note that the order of partitions is not deterministic
- let mut num_rows = 0;
- for partition in &results {
- for batch in partition {
- num_rows += batch.num_rows();
- }
- }
- assert_eq!(20, num_rows);
-
- let results: Vec<RecordBatch> =
results.into_iter().flatten().collect();
- let expected = vec![
- "+----+----+",
- "| c1 | c2 |",
- "+----+----+",
- "| 1 | 1 |",
- "| 1 | 10 |",
- "| 1 | 2 |",
- "| 1 | 3 |",
- "| 1 | 4 |",
- "| 1 | 5 |",
- "| 1 | 6 |",
- "| 1 | 7 |",
- "| 1 | 8 |",
- "| 1 | 9 |",
- "| 2 | 1 |",
- "| 2 | 10 |",
- "| 2 | 2 |",
- "| 2 | 3 |",
- "| 2 | 4 |",
- "| 2 | 5 |",
- "| 2 | 6 |",
- "| 2 | 7 |",
- "| 2 | 8 |",
- "| 2 | 9 |",
- "+----+----+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn projection_on_table_scan() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let partition_count = 4;
- let ctx = create_ctx(&tmp_dir, partition_count).await?;
- let runtime = ctx.state.lock().runtime_env.clone();
-
- let table = ctx.table("test")?;
- let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
- .project(vec![col("c2")])?
- .build()?;
-
- let optimized_plan = ctx.optimize(&logical_plan)?;
- match &optimized_plan {
- LogicalPlan::Projection(Projection { input, .. }) => match
&**input {
- LogicalPlan::TableScan(TableScan {
- source,
- projected_schema,
- ..
- }) => {
- assert_eq!(source.schema().fields().len(), 3);
- assert_eq!(projected_schema.fields().len(), 1);
- }
- _ => panic!("input to projection should be TableScan"),
- },
- _ => panic!("expect optimized_plan to be projection"),
- }
-
- let expected = "Projection: #test.c2\
- \n TableScan: test projection=Some([1])";
- assert_eq!(format!("{:?}", optimized_plan), expected);
-
- let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
-
- assert_eq!(1, physical_plan.schema().fields().len());
- assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
-
- let batches = collect(physical_plan, runtime).await?;
- assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());
-
- Ok(())
- }
-
- #[tokio::test]
- async fn preserve_nullability_on_projection() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let ctx = create_ctx(&tmp_dir, 1).await?;
-
- let schema: Schema =
ctx.table("test").unwrap().schema().clone().into();
- assert!(!schema.field_with_name("c1")?.is_nullable());
-
- let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)?
- .project(vec![col("c1")])?
- .build()?;
-
- let plan = ctx.optimize(&plan)?;
- let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
- assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
- Ok(())
- }
-
- #[tokio::test]
- async fn projection_on_memory_scan() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("a", DataType::Int32, false),
- Field::new("b", DataType::Int32, false),
- Field::new("c", DataType::Int32, false),
- ]);
- let schema = SchemaRef::new(schema);
-
- let partitions = vec![vec![RecordBatch::try_new(
- schema.clone(),
- vec![
- Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
- Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
- Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
- ],
- )?]];
-
- let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
- .project(vec![col("b")])?
- .build()?;
- assert_fields_eq(&plan, vec!["b"]);
-
- let ctx = ExecutionContext::new();
- let optimized_plan = ctx.optimize(&plan)?;
- match &optimized_plan {
- LogicalPlan::Projection(Projection { input, .. }) => match
&**input {
- LogicalPlan::TableScan(TableScan {
- source,
- projected_schema,
- ..
- }) => {
- assert_eq!(source.schema().fields().len(), 3);
- assert_eq!(projected_schema.fields().len(), 1);
- }
- _ => panic!("input to projection should be InMemoryScan"),
- },
- _ => panic!("expect optimized_plan to be projection"),
- }
-
- let expected = format!(
- "Projection: #{}.b\
- \n TableScan: {} projection=Some([1])",
- UNNAMED_TABLE, UNNAMED_TABLE
- );
- assert_eq!(format!("{:?}", optimized_plan), expected);
-
- let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
-
- assert_eq!(1, physical_plan.schema().fields().len());
- assert_eq!("b", physical_plan.schema().field(0).name().as_str());
-
- let runtime = ctx.state.lock().runtime_env.clone();
- let batches = collect(physical_plan, runtime).await?;
- assert_eq!(1, batches.len());
- assert_eq!(1, batches[0].num_columns());
- assert_eq!(4, batches[0].num_rows());
-
- Ok(())
- }
-
- #[tokio::test]
async fn sort() -> Result<()> {
let results =
execute("SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC",
4).await?;
diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs
index 95623d4..468762e 100644
--- a/datafusion/tests/sql/mod.rs
+++ b/datafusion/tests/sql/mod.rs
@@ -98,6 +98,7 @@ pub mod window;
mod explain;
pub mod information_schema;
+mod partitioned_csv;
#[cfg_attr(not(feature = "unicode_expressions"), ignore)]
pub mod unicode;
diff --git a/datafusion/tests/sql/partitioned_csv.rs
b/datafusion/tests/sql/partitioned_csv.rs
new file mode 100644
index 0000000..5efc837
--- /dev/null
+++ b/datafusion/tests/sql/partitioned_csv.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility functions for running with a partitioned csv dataset:
+
+use std::{io::Write, sync::Arc};
+
+use arrow::{
+ datatypes::{DataType, Field, Schema, SchemaRef},
+ record_batch::RecordBatch,
+};
+use datafusion::{
+ error::Result,
+ prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext},
+};
+use tempfile::TempDir;
+
+/// Execute SQL and return results
+async fn plan_and_collect(
+ ctx: &mut ExecutionContext,
+ sql: &str,
+) -> Result<Vec<RecordBatch>> {
+ ctx.sql(sql).await?.collect().await
+}
+
+/// Execute SQL and return results
+pub async fn execute(sql: &str, partition_count: usize) ->
Result<Vec<RecordBatch>> {
+ let tmp_dir = TempDir::new()?;
+ let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
+ plan_and_collect(&mut ctx, sql).await
+}
+
+/// Generate CSV partitions within the supplied directory
+fn populate_csv_partitions(
+ tmp_dir: &TempDir,
+ partition_count: usize,
+ file_extension: &str,
+) -> Result<SchemaRef> {
+ // define schema for data source (csv file)
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::UInt32, false),
+ Field::new("c2", DataType::UInt64, false),
+ Field::new("c3", DataType::Boolean, false),
+ ]));
+
+ // generate a partitioned file
+ for partition in 0..partition_count {
+ let filename = format!("partition-{}.{}", partition, file_extension);
+ let file_path = tmp_dir.path().join(&filename);
+ let mut file = std::fs::File::create(file_path)?;
+
+ // generate some data
+ for i in 0..=10 {
+ let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+ file.write_all(data.as_bytes())?;
+ }
+ }
+
+ Ok(schema)
+}
+
+/// Generate a partitioned CSV file and register it with an execution context
+pub async fn create_ctx(
+ tmp_dir: &TempDir,
+ partition_count: usize,
+) -> Result<ExecutionContext> {
+ let mut ctx =
+
ExecutionContext::with_config(ExecutionConfig::new().with_target_partitions(8));
+
+ let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;
+
+ // register csv file with the execution context
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema),
+ )
+ .await?;
+
+ Ok(ctx)
+}
diff --git a/datafusion/tests/sql/projection.rs
b/datafusion/tests/sql/projection.rs
index 57fa598..0a956a9 100644
--- a/datafusion/tests/sql/projection.rs
+++ b/datafusion/tests/sql/projection.rs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE};
+use tempfile::TempDir;
+
use super::*;
#[tokio::test]
@@ -73,3 +76,192 @@ async fn csv_query_group_by_avg_with_projection() ->
Result<()> {
assert_batches_sorted_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn parallel_projection() -> Result<()> {
+ let partition_count = 4;
+ let results =
+ partitioned_csv::execute("SELECT c1, c2 FROM test",
partition_count).await?;
+
+ let expected = vec![
+ "+----+----+",
+ "| c1 | c2 |",
+ "+----+----+",
+ "| 3 | 1 |",
+ "| 3 | 2 |",
+ "| 3 | 3 |",
+ "| 3 | 4 |",
+ "| 3 | 5 |",
+ "| 3 | 6 |",
+ "| 3 | 7 |",
+ "| 3 | 8 |",
+ "| 3 | 9 |",
+ "| 3 | 10 |",
+ "| 2 | 1 |",
+ "| 2 | 2 |",
+ "| 2 | 3 |",
+ "| 2 | 4 |",
+ "| 2 | 5 |",
+ "| 2 | 6 |",
+ "| 2 | 7 |",
+ "| 2 | 8 |",
+ "| 2 | 9 |",
+ "| 2 | 10 |",
+ "| 1 | 1 |",
+ "| 1 | 2 |",
+ "| 1 | 3 |",
+ "| 1 | 4 |",
+ "| 1 | 5 |",
+ "| 1 | 6 |",
+ "| 1 | 7 |",
+ "| 1 | 8 |",
+ "| 1 | 9 |",
+ "| 1 | 10 |",
+ "| 0 | 1 |",
+ "| 0 | 2 |",
+ "| 0 | 3 |",
+ "| 0 | 4 |",
+ "| 0 | 5 |",
+ "| 0 | 6 |",
+ "| 0 | 7 |",
+ "| 0 | 8 |",
+ "| 0 | 9 |",
+ "| 0 | 10 |",
+ "+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn projection_on_table_scan() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let partition_count = 4;
+ let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+ let runtime = ctx.state.lock().runtime_env.clone();
+
+ let table = ctx.table("test")?;
+ let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
+ .project(vec![col("c2")])?
+ .build()?;
+
+ let optimized_plan = ctx.optimize(&logical_plan)?;
+ match &optimized_plan {
+ LogicalPlan::Projection(Projection { input, .. }) => match &**input {
+ LogicalPlan::TableScan(TableScan {
+ source,
+ projected_schema,
+ ..
+ }) => {
+ assert_eq!(source.schema().fields().len(), 3);
+ assert_eq!(projected_schema.fields().len(), 1);
+ }
+ _ => panic!("input to projection should be TableScan"),
+ },
+ _ => panic!("expect optimized_plan to be projection"),
+ }
+
+ let expected = "Projection: #test.c2\
+ \n TableScan: test projection=Some([1])";
+ assert_eq!(format!("{:?}", optimized_plan), expected);
+
+ let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
+
+ assert_eq!(1, physical_plan.schema().fields().len());
+ assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
+
+ let batches = collect(physical_plan, runtime).await?;
+ assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn preserve_nullability_on_projection() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = partitioned_csv::create_ctx(&tmp_dir, 1).await?;
+
+ let schema: Schema = ctx.table("test").unwrap().schema().clone().into();
+ assert!(!schema.field_with_name("c1")?.is_nullable());
+
+ let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)?
+ .project(vec![col("c1")])?
+ .build()?;
+
+ let plan = ctx.optimize(&plan)?;
+ let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
+ assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
+ Ok(())
+}
+
+#[tokio::test]
+async fn projection_on_memory_scan() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let schema = SchemaRef::new(schema);
+
+ let partitions = vec![vec![RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
+ Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
+ Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
+ ],
+ )?]];
+
+ let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
+ .project(vec![col("b")])?
+ .build()?;
+ assert_fields_eq(&plan, vec!["b"]);
+
+ let ctx = ExecutionContext::new();
+ let optimized_plan = ctx.optimize(&plan)?;
+ match &optimized_plan {
+ LogicalPlan::Projection(Projection { input, .. }) => match &**input {
+ LogicalPlan::TableScan(TableScan {
+ source,
+ projected_schema,
+ ..
+ }) => {
+ assert_eq!(source.schema().fields().len(), 3);
+ assert_eq!(projected_schema.fields().len(), 1);
+ }
+ _ => panic!("input to projection should be InMemoryScan"),
+ },
+ _ => panic!("expect optimized_plan to be projection"),
+ }
+
+ let expected = format!(
+ "Projection: #{}.b\
+ \n TableScan: {} projection=Some([1])",
+ UNNAMED_TABLE, UNNAMED_TABLE
+ );
+ assert_eq!(format!("{:?}", optimized_plan), expected);
+
+ let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
+
+ assert_eq!(1, physical_plan.schema().fields().len());
+ assert_eq!("b", physical_plan.schema().field(0).name().as_str());
+
+ let runtime = ctx.state.lock().runtime_env.clone();
+ let batches = collect(physical_plan, runtime).await?;
+ assert_eq!(1, batches.len());
+ assert_eq!(1, batches[0].num_columns());
+ assert_eq!(4, batches[0].num_rows());
+
+ Ok(())
+}
+
+fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
+ let actual: Vec<String> = plan
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| f.name().clone())
+ .collect();
+ assert_eq!(actual, expected);
+}
diff --git a/datafusion/tests/sql/select.rs b/datafusion/tests/sql/select.rs
index 759a45c..02869dd 100644
--- a/datafusion/tests/sql/select.rs
+++ b/datafusion/tests/sql/select.rs
@@ -16,7 +16,8 @@
// under the License.
use super::*;
-use datafusion::from_slice::FromSlice;
+use datafusion::{from_slice::FromSlice, physical_plan::collect_partitioned};
+use tempfile::TempDir;
#[tokio::test]
async fn all_where_empty() -> Result<()> {
@@ -928,3 +929,59 @@ async fn csv_select_nested() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn parallel_query_with_filter() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let partition_count = 4;
+ let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+
+ let logical_plan =
+ ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 <
3")?;
+ let logical_plan = ctx.optimize(&logical_plan)?;
+
+ let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
+
+ let runtime = ctx.state.lock().runtime_env.clone();
+ let results = collect_partitioned(physical_plan, runtime).await?;
+
+ // note that the order of partitions is not deterministic
+ let mut num_rows = 0;
+ for partition in &results {
+ for batch in partition {
+ num_rows += batch.num_rows();
+ }
+ }
+ assert_eq!(20, num_rows);
+
+ let results: Vec<RecordBatch> = results.into_iter().flatten().collect();
+ let expected = vec![
+ "+----+----+",
+ "| c1 | c2 |",
+ "+----+----+",
+ "| 1 | 1 |",
+ "| 1 | 10 |",
+ "| 1 | 2 |",
+ "| 1 | 3 |",
+ "| 1 | 4 |",
+ "| 1 | 5 |",
+ "| 1 | 6 |",
+ "| 1 | 7 |",
+ "| 1 | 8 |",
+ "| 1 | 9 |",
+ "| 2 | 1 |",
+ "| 2 | 10 |",
+ "| 2 | 2 |",
+ "| 2 | 3 |",
+ "| 2 | 4 |",
+ "| 2 | 5 |",
+ "| 2 | 6 |",
+ "| 2 | 7 |",
+ "| 2 | 8 |",
+ "| 2 | 9 |",
+ "+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}