This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5bf80d655d Minor: Add repartition_file.slt end to end test for
repartitioning files, and supporting tweaks (#8505)
5bf80d655d is described below
commit 5bf80d655d545622d0272bc8e4bcf89dacaacf36
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Dec 13 16:07:01 2023 -0500
Minor: Add repartition_file.slt end to end test for repartitioning files,
and supporting tweaks (#8505)
* Minor: Add repartition_file.slt end to end test for repartitioning files,
and supporting tweaks
* Sort files, update tests
---
datafusion/core/src/datasource/listing/helpers.rs | 8 +-
datafusion/core/src/datasource/listing/mod.rs | 5 +
datafusion/core/src/datasource/listing/url.rs | 4 +
datafusion/sqllogictest/bin/sqllogictests.rs | 2 +
.../sqllogictest/test_files/repartition_scan.slt | 268 +++++++++++++++++++++
5 files changed, 286 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 3536c098bd..be74afa1f4 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -141,12 +141,18 @@ const CONCURRENCY_LIMIT: usize = 100;
/// Partition the list of files into `n` groups
pub fn split_files(
- partitioned_files: Vec<PartitionedFile>,
+ mut partitioned_files: Vec<PartitionedFile>,
n: usize,
) -> Vec<Vec<PartitionedFile>> {
if partitioned_files.is_empty() {
return vec![];
}
+
+ // ObjectStore::list does not guarantee any consistent order and for some
+ // implementations such as LocalFileSystem, it may be inconsistent. Thus
+ // Sort files by path to ensure consistent plans when run more than once.
+ partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
+
// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
partitioned_files
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index 87c1663ae7..5e5b96f6ba 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -109,6 +109,11 @@ impl PartitionedFile {
let size = std::fs::metadata(path.clone())?.len();
Ok(Self::new(path, size))
}
+
+ /// Return the path of this partitioned file
+ pub fn path(&self) -> &Path {
+ &self.object_meta.location
+ }
}
impl From<ObjectMeta> for PartitionedFile {
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
index 9e9fb92100..979ed9e975 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -131,6 +131,10 @@ impl ListingTableUrl {
if is_directory {
fs::create_dir_all(path)?;
} else {
+ // ensure parent directory exists
+ if let Some(parent) = path.parent() {
+ fs::create_dir_all(parent)?;
+ }
fs::File::create(path)?;
}
}
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 618e3106c6..484677d58e 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -159,6 +159,7 @@ async fn run_test_file_with_postgres(test_file: TestFile)
-> Result<()> {
relative_path,
} = test_file;
info!("Running with Postgres runner: {}", path.display());
+ setup_scratch_dir(&relative_path)?;
let mut runner =
sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
runner.with_column_validator(strict_column_validator);
@@ -188,6 +189,7 @@ async fn run_complete_file(test_file: TestFile) ->
Result<()> {
info!("Skipping: {}", path.display());
return Ok(());
};
+ setup_scratch_dir(&relative_path)?;
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt
b/datafusion/sqllogictest/test_files/repartition_scan.slt
new file mode 100644
index 0000000000..551d6d9ed4
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -0,0 +1,268 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for automatically reading files in parallel during scan
+##########
+
+# Set 4 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 4;
+
+# automatically partition all files over 1 byte
+statement ok
+set datafusion.optimizer.repartition_file_min_size = 1;
+
+###################
+### Parquet tests
+###################
+
+# create a single parquet file
+# Note filename 2.parquet to test sorting (on local file systems it is often
listed before 1.parquet)
+statement ok
+COPY (VALUES (1), (2), (3), (4), (5)) TO
'test_files/scratch/repartition_scan/parquet_table/2.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table(column1 int)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/repartition_scan/parquet_table/';
+
+query I
+select * from parquet_table;
+----
+1
+2
+3
+4
+5
+
+## Expect to see the scan read the file as "4" groups with even sizes (offsets)
+query TT
+EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: parquet_table.column1 != Int32(42)
+--TableScan: parquet_table projection=[column1],
partial_filters=[parquet_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]},
project [...]
+
+# create a second parquet file
+statement ok
+COPY (VALUES (100), (200)) TO
'test_files/scratch/repartition_scan/parquet_table/1.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+## Still expect to see the scan read the file as "4" groups with even sizes.
One group should read
+## parts of both files.
+query TT
+EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42 ORDER BY column1;
+----
+logical_plan
+Sort: parquet_table.column1 ASC NULLS LAST
+--Filter: parquet_table.column1 != Int32(42)
+----TableScan: parquet_table projection=[column1],
partial_filters=[parquet_table.column1 != Int32(42)]
+physical_plan
+SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
+--SortExec: expr=[column1@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=8192
+------FilterExec: column1@0 != 42
+--------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206],
[WORKSPACE_RO [...]
+
+
+## Read the files as though they are ordered
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table_with_order(column1 int)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/repartition_scan/parquet_table'
+WITH ORDER (column1 ASC);
+
+# output should be ordered
+query I
+SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY
column1;
+----
+1
+2
+3
+4
+5
+100
+200
+
+# explain should not have any groups with more than one file
+# https://github.com/apache/arrow-datafusion/issues/8451
+query TT
+EXPLAIN SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER
BY column1;
+----
+logical_plan
+Sort: parquet_table_with_order.column1 ASC NULLS LAST
+--Filter: parquet_table_with_order.column1 != Int32(42)
+----TableScan: parquet_table_with_order projection=[column1],
partial_filters=[parquet_table_with_order.column1 != Int32(42)]
+physical_plan
+SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: column1@0 != 42
+------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206],
[WORKSPACE_ROOT [...]
+
+# Cleanup
+statement ok
+DROP TABLE parquet_table;
+
+statement ok
+DROP TABLE parquet_table_with_order;
+
+
+###################
+### CSV tests
+###################
+
+# Since parquet and CSV share most of the same implementation, this test checks
+# that the basics are connected properly
+
+# create a single csv file
+statement ok
+COPY (VALUES (1), (2), (3), (4), (5)) TO
'test_files/scratch/repartition_scan/csv_table/1.csv'
+(FORMAT csv, SINGLE_FILE_OUTPUT true, HEADER true);
+
+statement ok
+CREATE EXTERNAL TABLE csv_table(column1 int)
+STORED AS csv
+WITH HEADER ROW
+LOCATION 'test_files/scratch/repartition_scan/csv_table/';
+
+query I
+select * from csv_table;
+----
+1
+2
+3
+4
+5
+
+## Expect to see the scan read the file as "4" groups with even sizes (offsets)
+query TT
+EXPLAIN SELECT column1 FROM csv_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: csv_table.column1 != Int32(42)
+--TableScan: csv_table projection=[column1],
partial_filters=[csv_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:0..5],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:5..10],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:10..15],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:15..18]]},
projection=[column1], has_header=true
+
+# Cleanup
+statement ok
+DROP TABLE csv_table;
+
+
+###################
+### JSON tests
+###################
+
+# Since parquet and json share most of the same implementation, this test
checks
+# that the basics are connected properly
+
+# create a single json file
+statement ok
+COPY (VALUES (1), (2), (3), (4), (5)) TO
'test_files/scratch/repartition_scan/json_table/1.json'
+(FORMAT json, SINGLE_FILE_OUTPUT true);
+
+statement ok
+CREATE EXTERNAL TABLE json_table(column1 int)
+STORED AS json
+LOCATION 'test_files/scratch/repartition_scan/json_table/';
+
+query I
+select * from json_table;
+----
+1
+2
+3
+4
+5
+
+## In the future it would be cool to see the file read as "4" groups with even
sizes (offsets)
+## but for now it is just one group
+## https://github.com/apache/arrow-datafusion/issues/8502
+query TT
+EXPLAIN SELECT column1 FROM json_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: json_table.column1 != Int32(42)
+--TableScan: json_table projection=[column1],
partial_filters=[json_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------JsonExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]},
projection=[column1]
+
+
+# Cleanup
+statement ok
+DROP TABLE json_table;
+
+
+###################
+### Arrow File tests
+###################
+
+## Use pre-existing files we don't have a way to create arrow files yet
+## (https://github.com/apache/arrow-datafusion/issues/8504)
+statement ok
+CREATE EXTERNAL TABLE arrow_table
+STORED AS ARROW
+LOCATION '../core/tests/data/example.arrow';
+
+
+# It would be great to see the file read as "4" groups with even sizes
(offsets) eventually
+# https://github.com/apache/arrow-datafusion/issues/8503
+query TT
+EXPLAIN SELECT * FROM arrow_table
+----
+logical_plan TableScan: arrow_table projection=[f0, f1, f2]
+physical_plan ArrowExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0,
f1, f2]
+
+# Cleanup
+statement ok
+DROP TABLE arrow_table;
+
+###################
+### Avro File tests
+###################
+
+## Use pre-existing files we don't have a way to create avro files yet
+
+statement ok
+CREATE EXTERNAL TABLE avro_table
+STORED AS AVRO
+WITH HEADER ROW
+LOCATION '../../testing/data/avro/simple_enum.avro'
+
+
+# It would be great to see the file read as "4" groups with even sizes
(offsets) eventually
+query TT
+EXPLAIN SELECT * FROM avro_table
+----
+logical_plan TableScan: avro_table projection=[f1, f2, f3]
+physical_plan AvroExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3]
+
+# Cleanup
+statement ok
+DROP TABLE avro_table;