This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e986e15a67 Refactor partitioned_csv tests (#8919)
e986e15a67 is described below
commit e986e15a67f75cdf0e9822dd0a5f6a2b669b0098
Author: Dejan Simic <[email protected]>
AuthorDate: Mon Jan 22 20:43:30 2024 +0100
Refactor partitioned_csv tests (#8919)
---
datafusion/core/tests/sql/mod.rs | 1 -
datafusion/core/tests/sql/partitioned_csv.rs | 77 ------------------------
datafusion/core/tests/sql/select.rs | 6 +-
datafusion/sqllogictest/test_files/csv_files.slt | 73 ++++++++++++++++++++++
4 files changed, 76 insertions(+), 81 deletions(-)
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 981bdf34f5..40ae75cd7f 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -72,7 +72,6 @@ pub mod create_drop;
pub mod explain_analyze;
pub mod expr;
pub mod joins;
-pub mod partitioned_csv;
pub mod repartition;
pub mod select;
mod sql_api;
diff --git a/datafusion/core/tests/sql/partitioned_csv.rs
b/datafusion/core/tests/sql/partitioned_csv.rs
deleted file mode 100644
index b77557a66c..0000000000
--- a/datafusion/core/tests/sql/partitioned_csv.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Utility functions for creating and running with a partitioned csv dataset.
-
-use std::{io::Write, sync::Arc};
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::{
- error::Result,
- prelude::{CsvReadOptions, SessionConfig, SessionContext},
-};
-use tempfile::TempDir;
-
-/// Generate CSV partitions within the supplied directory
-fn populate_csv_partitions(
- tmp_dir: &TempDir,
- partition_count: usize,
- file_extension: &str,
-) -> Result<SchemaRef> {
- // define schema for data source (csv file)
- let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::UInt32, false),
- Field::new("c2", DataType::UInt64, false),
- Field::new("c3", DataType::Boolean, false),
- ]));
-
- // generate a partitioned file
- for partition in 0..partition_count {
- let filename = format!("partition-{partition}.{file_extension}");
- let file_path = tmp_dir.path().join(filename);
- let mut file = std::fs::File::create(file_path)?;
-
- // generate some data
- for i in 0..=10 {
- let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
- file.write_all(data.as_bytes())?;
- }
- }
-
- Ok(schema)
-}
-
-/// Generate a partitioned CSV file and register it with an execution context
-pub async fn create_ctx(
- tmp_dir: &TempDir,
- partition_count: usize,
-) -> Result<SessionContext> {
- let ctx =
-
SessionContext::new_with_config(SessionConfig::new().with_target_partitions(8));
-
- let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;
-
- // register csv file with the execution context
- ctx.register_csv(
- "test",
- tmp_dir.path().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
-
- Ok(ctx)
-}
diff --git a/datafusion/core/tests/sql/select.rs
b/datafusion/core/tests/sql/select.rs
index cbdea9d729..4a782e54b0 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -482,7 +482,7 @@ async fn sort_on_window_null_string() -> Result<()> {
async fn test_prepare_statement() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
- let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+ let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
// sql to statement then to prepare logical plan with parameters
// c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and
Float64
@@ -529,7 +529,7 @@ async fn test_prepare_statement() -> Result<()> {
async fn test_named_query_parameters() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
- let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+ let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
// sql to statement then to logical plan with parameters
// c1 defined as UINT32, c2 defined as UInt64
@@ -576,7 +576,7 @@ async fn test_named_query_parameters() -> Result<()> {
async fn parallel_query_with_filter() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
- let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+ let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
let dataframe = ctx
.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
diff --git a/datafusion/sqllogictest/test_files/csv_files.slt
b/datafusion/sqllogictest/test_files/csv_files.slt
index 9facb064bf..5393083e6c 100644
--- a/datafusion/sqllogictest/test_files/csv_files.slt
+++ b/datafusion/sqllogictest/test_files/csv_files.slt
@@ -63,3 +63,76 @@ id6 value"6
id7 value"7
id8 value"8
id9 value"9
+
+
+# Read partitioned csv
+statement ok
+CREATE TABLE src_table_1 (
+ int_col INT,
+ string_col TEXT,
+ bigint_col BIGINT,
+ partition_col INT
+) AS VALUES
+(1, 'aaa', 100, 1),
+(2, 'bbb', 200, 1),
+(3, 'ccc', 300, 1),
+(4, 'ddd', 400, 1);
+
+statement ok
+CREATE TABLE src_table_2 (
+ int_col INT,
+ string_col TEXT,
+ bigint_col BIGINT,
+ partition_col INT
+) AS VALUES
+(5, 'eee', 500, 2),
+(6, 'fff', 600, 2),
+(7, 'ggg', 700, 2),
+(8, 'hhh', 800, 2);
+
+query ITII
+COPY src_table_1 TO 'test_files/scratch/csv_files/csv_partitions/1.csv'
+(FORMAT CSV, SINGLE_FILE_OUTPUT true);
+----
+4
+
+
+query ITII
+COPY src_table_2 TO 'test_files/scratch/csv_files/csv_partitions/2.csv'
+(FORMAT CSV, SINGLE_FILE_OUTPUT true);
+----
+4
+
+statement ok
+CREATE EXTERNAL TABLE partitioned_table (
+ int_col INT,
+ string_col TEXT,
+ bigint_col BIGINT,
+ partition_col INT
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION 'test_files/scratch/csv_files/csv_partitions';
+
+query ITII
+SELECT * FROM partitioned_table ORDER BY int_col;
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+query TT
+EXPLAIN SELECT * FROM partitioned_table ORDER BY int_col;
+----
+logical_plan
+Sort: partitioned_table.int_col ASC NULLS LAST
+--TableScan: partitioned_table projection=[int_col, string_col, bigint_col,
partition_col]
+physical_plan
+SortPreservingMergeExec: [int_col@0 ASC NULLS LAST]
+--SortExec: expr=[int_col@0 ASC NULLS LAST]
+----CsvExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/csv_files/csv_partitions/1.csv],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/csv_files/csv_partitions/2.csv]]},
projection=[int_col, string_col, bigint_col, partition_col], has_header=true