This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bf80d655d Minor: Add repartition_file.slt end to end test for 
repartitioning files, and supporting tweaks (#8505)
5bf80d655d is described below

commit 5bf80d655d545622d0272bc8e4bcf89dacaacf36
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Dec 13 16:07:01 2023 -0500

    Minor: Add repartition_file.slt end to end test for repartitioning files, 
and supporting tweaks (#8505)
    
    * Minor: Add repartition_file.slt end to end test for repartitioning files, 
and supporting tweaks
    
    * Sort files, update tests
---
 datafusion/core/src/datasource/listing/helpers.rs  |   8 +-
 datafusion/core/src/datasource/listing/mod.rs      |   5 +
 datafusion/core/src/datasource/listing/url.rs      |   4 +
 datafusion/sqllogictest/bin/sqllogictests.rs       |   2 +
 .../sqllogictest/test_files/repartition_scan.slt   | 268 +++++++++++++++++++++
 5 files changed, 286 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 3536c098bd..be74afa1f4 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -141,12 +141,18 @@ const CONCURRENCY_LIMIT: usize = 100;
 
 /// Partition the list of files into `n` groups
 pub fn split_files(
-    partitioned_files: Vec<PartitionedFile>,
+    mut partitioned_files: Vec<PartitionedFile>,
     n: usize,
 ) -> Vec<Vec<PartitionedFile>> {
     if partitioned_files.is_empty() {
         return vec![];
     }
+
+    // ObjectStore::list does not guarantee any consistent order and for some
+    // implementations such as LocalFileSystem, it may be inconsistent. Thus
+    // Sort files by path to ensure consistent plans when run more than once.
+    partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
+
     // effectively this is div with rounding up instead of truncating
     let chunk_size = (partitioned_files.len() + n - 1) / n;
     partitioned_files
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index 87c1663ae7..5e5b96f6ba 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -109,6 +109,11 @@ impl PartitionedFile {
         let size = std::fs::metadata(path.clone())?.len();
         Ok(Self::new(path, size))
     }
+
+    /// Return the path of this partitioned file
+    pub fn path(&self) -> &Path {
+        &self.object_meta.location
+    }
 }
 
 impl From<ObjectMeta> for PartitionedFile {
diff --git a/datafusion/core/src/datasource/listing/url.rs 
b/datafusion/core/src/datasource/listing/url.rs
index 9e9fb92100..979ed9e975 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -131,6 +131,10 @@ impl ListingTableUrl {
                     if is_directory {
                         fs::create_dir_all(path)?;
                     } else {
+                        // ensure parent directory exists
+                        if let Some(parent) = path.parent() {
+                            fs::create_dir_all(parent)?;
+                        }
                         fs::File::create(path)?;
                     }
                 }
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs 
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 618e3106c6..484677d58e 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -159,6 +159,7 @@ async fn run_test_file_with_postgres(test_file: TestFile) 
-> Result<()> {
         relative_path,
     } = test_file;
     info!("Running with Postgres runner: {}", path.display());
+    setup_scratch_dir(&relative_path)?;
     let mut runner =
         sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
     runner.with_column_validator(strict_column_validator);
@@ -188,6 +189,7 @@ async fn run_complete_file(test_file: TestFile) -> 
Result<()> {
         info!("Skipping: {}", path.display());
         return Ok(());
     };
+    setup_scratch_dir(&relative_path)?;
     let mut runner = sqllogictest::Runner::new(|| async {
         Ok(DataFusion::new(
             test_ctx.session_ctx().clone(),
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt 
b/datafusion/sqllogictest/test_files/repartition_scan.slt
new file mode 100644
index 0000000000..551d6d9ed4
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -0,0 +1,268 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for automatically reading files in parallel during scan
+##########
+
+# Set 4 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 4;
+
+# automatically partition all files over 1 byte
+statement ok
+set datafusion.optimizer.repartition_file_min_size = 1;
+
+###################
+### Parquet tests
+###################
+
+# create a single parquet file
+# Note filename 2.parquet to test sorting (on local file systems it is often 
listed before 1.parquet)
+statement ok
+COPY  (VALUES (1), (2), (3), (4), (5)) TO 
'test_files/scratch/repartition_scan/parquet_table/2.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table(column1 int)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/repartition_scan/parquet_table/';
+
+query I
+select * from parquet_table;
+----
+1
+2
+3
+4
+5
+
+## Expect to see the scan read the file as "4" groups with even sizes (offsets)
+query TT
+EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: parquet_table.column1 != Int32(42)
+--TableScan: parquet_table projection=[column1], 
partial_filters=[parquet_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]},
 project [...]
+
+# create a second parquet file
+statement ok
+COPY  (VALUES (100), (200)) TO 
'test_files/scratch/repartition_scan/parquet_table/1.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+## Still expect to see the scan read the file as "4" groups with even sizes. 
One group should read
+## parts of both files.
+query TT
+EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42 ORDER BY column1;
+----
+logical_plan
+Sort: parquet_table.column1 ASC NULLS LAST
+--Filter: parquet_table.column1 != Int32(42)
+----TableScan: parquet_table projection=[column1], 
partial_filters=[parquet_table.column1 != Int32(42)]
+physical_plan
+SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
+--SortExec: expr=[column1@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=8192
+------FilterExec: column1@0 != 42
+--------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206],
 [WORKSPACE_RO [...]
+
+
+## Read the files as though they are ordered
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table_with_order(column1 int)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/repartition_scan/parquet_table'
+WITH ORDER (column1 ASC);
+
+# output should be ordered
+query I
+SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY 
column1;
+----
+1
+2
+3
+4
+5
+100
+200
+
+# explain should not have any groups with more than one file
+# https://github.com/apache/arrow-datafusion/issues/8451
+query TT
+EXPLAIN SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER 
BY column1;
+----
+logical_plan
+Sort: parquet_table_with_order.column1 ASC NULLS LAST
+--Filter: parquet_table_with_order.column1 != Int32(42)
+----TableScan: parquet_table_with_order projection=[column1], 
partial_filters=[parquet_table_with_order.column1 != Int32(42)]
+physical_plan
+SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: column1@0 != 42
+------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206],
 [WORKSPACE_ROOT [...]
+
+# Cleanup
+statement ok
+DROP TABLE parquet_table;
+
+statement ok
+DROP TABLE parquet_table_with_order;
+
+
+###################
+### CSV tests
+###################
+
+# Since parquet and CSV share most of the same implementation, this test checks
+# that the basics are connected properly
+
+# create a single csv file
+statement ok
+COPY  (VALUES (1), (2), (3), (4), (5)) TO 
'test_files/scratch/repartition_scan/csv_table/1.csv'
+(FORMAT csv, SINGLE_FILE_OUTPUT true, HEADER true);
+
+statement ok
+CREATE EXTERNAL TABLE csv_table(column1 int)
+STORED AS csv
+WITH HEADER ROW
+LOCATION 'test_files/scratch/repartition_scan/csv_table/';
+
+query I
+select * from csv_table;
+----
+1
+2
+3
+4
+5
+
+## Expect to see the scan read the file as "4" groups with even sizes (offsets)
+query TT
+EXPLAIN SELECT column1 FROM csv_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: csv_table.column1 != Int32(42)
+--TableScan: csv_table projection=[column1], 
partial_filters=[csv_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:0..5],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:5..10],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:10..15],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:15..18]]},
 projection=[column1], has_header=true
+
+# Cleanup
+statement ok
+DROP TABLE csv_table;
+
+
+###################
+### JSON tests
+###################
+
+# Since parquet and json share most of the same implementation, this test 
checks
+# that the basics are connected properly
+
+# create a single json file
+statement ok
+COPY  (VALUES (1), (2), (3), (4), (5)) TO 
'test_files/scratch/repartition_scan/json_table/1.json'
+(FORMAT json, SINGLE_FILE_OUTPUT true);
+
+statement ok
+CREATE EXTERNAL TABLE json_table(column1 int)
+STORED AS json
+LOCATION 'test_files/scratch/repartition_scan/json_table/';
+
+query I
+select * from json_table;
+----
+1
+2
+3
+4
+5
+
+## In the future it would be cool to see the file read as "4" groups with even 
sizes (offsets)
+## but for now it is just one group
+## https://github.com/apache/arrow-datafusion/issues/8502
+query TT
+EXPLAIN SELECT column1 FROM json_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: json_table.column1 != Int32(42)
+--TableScan: json_table projection=[column1], 
partial_filters=[json_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------JsonExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]},
 projection=[column1]
+
+
+# Cleanup
+statement ok
+DROP TABLE json_table;
+
+
+###################
+### Arrow File tests
+###################
+
+## Use pre-existing files we don't have a way to create arrow files yet
+## (https://github.com/apache/arrow-datafusion/issues/8504)
+statement ok
+CREATE EXTERNAL TABLE arrow_table
+STORED AS ARROW
+LOCATION '../core/tests/data/example.arrow';
+
+
+# It would be great to see the file read as "4" groups with even sizes 
(offsets) eventually
+# https://github.com/apache/arrow-datafusion/issues/8503
+query TT
+EXPLAIN SELECT * FROM arrow_table
+----
+logical_plan TableScan: arrow_table projection=[f0, f1, f2]
+physical_plan ArrowExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, 
f1, f2]
+
+# Cleanup
+statement ok
+DROP TABLE arrow_table;
+
+###################
+### Avro File tests
+###################
+
+## Use pre-existing files we don't have a way to create avro files yet
+
+statement ok
+CREATE EXTERNAL TABLE avro_table
+STORED AS AVRO
+WITH HEADER ROW
+LOCATION '../../testing/data/avro/simple_enum.avro'
+
+
+# It would be great to see the file read as "4" groups with even sizes 
(offsets) eventually
+query TT
+EXPLAIN SELECT * FROM avro_table
+----
+logical_plan TableScan: avro_table projection=[f1, f2, f3]
+physical_plan AvroExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3]
+
+# Cleanup
+statement ok
+DROP TABLE avro_table;

Reply via email to