This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e986e15a67 Refactor partitioned_csv tests (#8919)
e986e15a67 is described below

commit e986e15a67f75cdf0e9822dd0a5f6a2b669b0098
Author: Dejan Simic <[email protected]>
AuthorDate: Mon Jan 22 20:43:30 2024 +0100

    Refactor partitioned_csv tests (#8919)
---
 datafusion/core/tests/sql/mod.rs                 |  1 -
 datafusion/core/tests/sql/partitioned_csv.rs     | 77 ------------------------
 datafusion/core/tests/sql/select.rs              |  6 +-
 datafusion/sqllogictest/test_files/csv_files.slt | 73 ++++++++++++++++++++++
 4 files changed, 76 insertions(+), 81 deletions(-)

diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 981bdf34f5..40ae75cd7f 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -72,7 +72,6 @@ pub mod create_drop;
 pub mod explain_analyze;
 pub mod expr;
 pub mod joins;
-pub mod partitioned_csv;
 pub mod repartition;
 pub mod select;
 mod sql_api;
diff --git a/datafusion/core/tests/sql/partitioned_csv.rs 
b/datafusion/core/tests/sql/partitioned_csv.rs
deleted file mode 100644
index b77557a66c..0000000000
--- a/datafusion/core/tests/sql/partitioned_csv.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Utility functions for creating and running with a partitioned csv dataset.
-
-use std::{io::Write, sync::Arc};
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::{
-    error::Result,
-    prelude::{CsvReadOptions, SessionConfig, SessionContext},
-};
-use tempfile::TempDir;
-
-/// Generate CSV partitions within the supplied directory
-fn populate_csv_partitions(
-    tmp_dir: &TempDir,
-    partition_count: usize,
-    file_extension: &str,
-) -> Result<SchemaRef> {
-    // define schema for data source (csv file)
-    let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::UInt32, false),
-        Field::new("c2", DataType::UInt64, false),
-        Field::new("c3", DataType::Boolean, false),
-    ]));
-
-    // generate a partitioned file
-    for partition in 0..partition_count {
-        let filename = format!("partition-{partition}.{file_extension}");
-        let file_path = tmp_dir.path().join(filename);
-        let mut file = std::fs::File::create(file_path)?;
-
-        // generate some data
-        for i in 0..=10 {
-            let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
-            file.write_all(data.as_bytes())?;
-        }
-    }
-
-    Ok(schema)
-}
-
-/// Generate a partitioned CSV file and register it with an execution context
-pub async fn create_ctx(
-    tmp_dir: &TempDir,
-    partition_count: usize,
-) -> Result<SessionContext> {
-    let ctx =
-        
SessionContext::new_with_config(SessionConfig::new().with_target_partitions(8));
-
-    let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;
-
-    // register csv file with the execution context
-    ctx.register_csv(
-        "test",
-        tmp_dir.path().to_str().unwrap(),
-        CsvReadOptions::new().schema(&schema),
-    )
-    .await?;
-
-    Ok(ctx)
-}
diff --git a/datafusion/core/tests/sql/select.rs 
b/datafusion/core/tests/sql/select.rs
index cbdea9d729..4a782e54b0 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -482,7 +482,7 @@ async fn sort_on_window_null_string() -> Result<()> {
 async fn test_prepare_statement() -> Result<()> {
     let tmp_dir = TempDir::new()?;
     let partition_count = 4;
-    let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+    let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
 
     // sql to statement then to prepare logical plan with parameters
     // c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and 
Float64
@@ -529,7 +529,7 @@ async fn test_prepare_statement() -> Result<()> {
 async fn test_named_query_parameters() -> Result<()> {
     let tmp_dir = TempDir::new()?;
     let partition_count = 4;
-    let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+    let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
 
     // sql to statement then to logical plan with parameters
     // c1 defined as UINT32, c2 defined as UInt64
@@ -576,7 +576,7 @@ async fn test_named_query_parameters() -> Result<()> {
 async fn parallel_query_with_filter() -> Result<()> {
     let tmp_dir = TempDir::new()?;
     let partition_count = 4;
-    let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+    let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
 
     let dataframe = ctx
         .sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
diff --git a/datafusion/sqllogictest/test_files/csv_files.slt 
b/datafusion/sqllogictest/test_files/csv_files.slt
index 9facb064bf..5393083e6c 100644
--- a/datafusion/sqllogictest/test_files/csv_files.slt
+++ b/datafusion/sqllogictest/test_files/csv_files.slt
@@ -63,3 +63,76 @@ id6 value"6
 id7 value"7
 id8 value"8
 id9 value"9
+
+
+# Read partitioned csv
+statement ok
+CREATE TABLE src_table_1 (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  partition_col INT
+) AS VALUES
+(1, 'aaa', 100, 1),
+(2, 'bbb', 200, 1),
+(3, 'ccc', 300, 1),
+(4, 'ddd', 400, 1);
+
+statement ok
+CREATE TABLE src_table_2 (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  partition_col INT
+) AS VALUES
+(5, 'eee', 500, 2),
+(6, 'fff', 600, 2),
+(7, 'ggg', 700, 2),
+(8, 'hhh', 800, 2);
+
+query ITII
+COPY  src_table_1 TO 'test_files/scratch/csv_files/csv_partitions/1.csv'
+(FORMAT CSV, SINGLE_FILE_OUTPUT true);
+----
+4
+
+
+query ITII
+COPY  src_table_2 TO 'test_files/scratch/csv_files/csv_partitions/2.csv'
+(FORMAT CSV, SINGLE_FILE_OUTPUT true);
+----
+4
+
+statement ok
+CREATE EXTERNAL TABLE partitioned_table (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  partition_col INT
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION 'test_files/scratch/csv_files/csv_partitions';
+
+query ITII
+SELECT * FROM partitioned_table ORDER BY int_col;
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+query TT
+EXPLAIN SELECT * FROM partitioned_table ORDER BY int_col;
+----
+logical_plan
+Sort: partitioned_table.int_col ASC NULLS LAST
+--TableScan: partitioned_table projection=[int_col, string_col, bigint_col, 
partition_col]
+physical_plan
+SortPreservingMergeExec: [int_col@0 ASC NULLS LAST]
+--SortExec: expr=[int_col@0 ASC NULLS LAST]
+----CsvExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/csv_files/csv_partitions/1.csv],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/csv_files/csv_partitions/2.csv]]},
 projection=[int_col, string_col, bigint_col, partition_col], has_header=true

Reply via email to