NGA-TRAN commented on code in PR #9593:
URL: https://github.com/apache/datafusion/pull/9593#discussion_r1585329760


##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -144,9 +148,7 @@ Sort: test_table.string_col ASC NULLS LAST, 
test_table.int_col ASC NULLS LAST
 --TableScan: test_table projection=[int_col, string_col]
 physical_plan
 SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
---SortExec: expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
-----ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]},
 projection=[int_col, string_col]
-
+--ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]},
 projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS 
LAST, int_col@0 ASC NULLS LAST]

Review Comment:
   So nice 🎉 



##########
datafusion/core/src/datasource/listing/mod.rs:
##########
@@ -67,6 +67,11 @@ pub struct PartitionedFile {
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
+    /// Optional statistics that describe the data in this file if known.
+    ///
+    /// DataFusion relies on these statistics for planning so if they are 
incorrect
+    /// incorrect answers may result.

Review Comment:
   ```suggestion
       /// DataFusion relies on these statistics for planning so if they are 
incorrect,
       /// incorrect answers may result.
   ```
   
   I am guessing you use statistics for column min and max and determine 
whether data overlaps or not, right?  And if they do not overlap, we do not 
need to merge them before sorting. Maybe adding that to make it clear what you 
mean about incorrect statistics will lead to incorrect results



##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -132,8 +132,8 @@ STORED AS PARQUET;
 ----
 3
 
-# Check output plan again, expect an "output_ordering" clause in the 
physical_plan -> ParquetExec:
-# After https://github.com/apache/arrow-datafusion/pull/9593 this should not 
require a sort.
+# Check output plan again, expect no "output_ordering" clause in the 
physical_plan -> ParquetExec,
+# due to there being more files than partitions:

Review Comment:
   Nice negative test



##########
datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt:
##########
@@ -0,0 +1,352 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# TESTS FOR SORTED PARQUET FILES
+
+# Set 2 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# Collect statistics -- used for sorting files
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+# Create a table as a data source
+statement ok
+CREATE TABLE src_table (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  date_col DATE
+) AS VALUES
+(1, 'aaa', 100, 1),
+(2, 'bbb', 200, 2),
+(3, 'ccc', 300, 3),
+(4, 'ddd', 400, 4),
+(5, 'eee', 500, 5),
+(6, 'fff', 600, 6),
+(7, 'ggg', 700, 7),
+(8, 'hhh', 800, 8),
+(9, 'iii', 900, 9);
+
+# Setup 2 files, i.e., as many as there are partitions:
+
+# File 1:
+query ITID
+COPY (SELECT * FROM src_table LIMIT 3)
+TO 'test_files/scratch/parquet/test_table/0.parquet'
+STORED AS PARQUET;
+----
+3
+
+# File 2:
+query ITID
+COPY (SELECT * FROM src_table WHERE int_col > 3 LIMIT 3)

Review Comment:
   Similar as above,  `SELECT * FROM src_table where int_col >=4 and int_col 
<=6 order by int_col`



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -900,6 +940,62 @@ mod tests {
                 sort: vec![col("value").sort(true, false)],
                 expected_result: Err("construct min/max statistics\ncaused 
by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during 
planning: cannot sort by nullable column"),
             },
+            TestCase {
+                name: "all three non-overlapping",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
+                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0", "1", "2"]]),
+            },
+            TestCase {
+                name: "all three overlapping",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
+            },
+            TestCase {
+                name: "empty input",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![]),
+            },
+            TestCase {
+                name: "one file missing statistics",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("2", "2023-01-02", vec![None]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Err("construct min/max statistics\ncaused 
by\ncollect min/max values\ncaused by\nError during planning: statistics not 
found"),
+            },

Review Comment:
   Nice newly added tests



##########
datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt:
##########
@@ -0,0 +1,352 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# TESTS FOR SORTED PARQUET FILES
+
+# Set 2 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# Collect statistics -- used for sorting files
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+# Create a table as a data source
+statement ok
+CREATE TABLE src_table (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  date_col DATE
+) AS VALUES
+(1, 'aaa', 100, 1),
+(2, 'bbb', 200, 2),
+(3, 'ccc', 300, 3),
+(4, 'ddd', 400, 4),
+(5, 'eee', 500, 5),
+(6, 'fff', 600, 6),
+(7, 'ggg', 700, 7),
+(8, 'hhh', 800, 8),
+(9, 'iii', 900, 9);
+
+# Setup 2 files, i.e., as many as there are partitions:
+
+# File 1:
+query ITID
+COPY (SELECT * FROM src_table LIMIT 3)

Review Comment:
   I am a bit surprised this query always return the first 3 rows and its dat 
tis sorted. Maybe you want to make it deterministic in case this behavior no 
longer holds in the future by using `SELECT * FROM src_table where int_col <=3 
order by int_col`
   
   I think since you make your columns have corresponding increasing data, the 
order by can be on any column and your data is always sorted on any column



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -651,9 +651,13 @@ impl TableProvider for ListingTable {
                 output_ordering,
             )
         }) {
-            Some(Err(e)) => log::debug!("failed to sort file groups: {e}"),
+            Some(Err(e)) => log::debug!("failed to split file groups by 
statistics: {e}"),
             Some(Ok(new_groups)) => {
-                partitioned_file_lists = new_groups;
+                if new_groups.len() <= self.options.target_partitions {
+                    partitioned_file_lists = new_groups;
+                } else {
+                    log::debug!("attempted to split file groups by statistics, 
but there were more file groups than target_partitions; falling back to 
unordered")
+                }

Review Comment:
   👍 



##########
datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt:
##########
@@ -0,0 +1,352 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# TESTS FOR SORTED PARQUET FILES
+
+# Set 2 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# Collect statistics -- used for sorting files
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+# Create a table as a data source
+statement ok
+CREATE TABLE src_table (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  date_col DATE
+) AS VALUES
+(1, 'aaa', 100, 1),
+(2, 'bbb', 200, 2),
+(3, 'ccc', 300, 3),
+(4, 'ddd', 400, 4),
+(5, 'eee', 500, 5),
+(6, 'fff', 600, 6),
+(7, 'ggg', 700, 7),
+(8, 'hhh', 800, 8),
+(9, 'iii', 900, 9);
+
+# Setup 2 files, i.e., as many as there are partitions:
+
+# File 1:
+query ITID
+COPY (SELECT * FROM src_table LIMIT 3)
+TO 'test_files/scratch/parquet/test_table/0.parquet'
+STORED AS PARQUET;
+----
+3
+
+# File 2:
+query ITID
+COPY (SELECT * FROM src_table WHERE int_col > 3 LIMIT 3)
+TO 'test_files/scratch/parquet/test_table/1.parquet'
+STORED AS PARQUET;
+----
+3
+
+# Create a table from generated parquet files, without ordering:
+statement ok
+CREATE EXTERNAL TABLE test_table (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  date_col DATE
+)
+STORED AS PARQUET
+WITH HEADER ROW
+LOCATION 'test_files/scratch/parquet/test_table';
+
+# Basic query:
+query ITID
+SELECT * FROM test_table ORDER BY int_col;
+----
+1 aaa 100 1970-01-02
+2 bbb 200 1970-01-03
+3 ccc 300 1970-01-04
+4 ddd 400 1970-01-05
+5 eee 500 1970-01-06
+6 fff 600 1970-01-07
+
+# Check output plan, expect no "output_ordering" clause in the physical_plan 
-> ParquetExec:
+query TT
+EXPLAIN SELECT int_col, string_col
+FROM test_table
+ORDER BY string_col, int_col;
+----
+logical_plan
+Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST
+--TableScan: test_table projection=[int_col, string_col]
+physical_plan
+SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
+--SortExec: expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
+----ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]},
 projection=[int_col, string_col]
+
+# Tear down test_table:
+statement ok
+DROP TABLE test_table;
+
+# Create test_table again, but with ordering:
+statement ok
+CREATE EXTERNAL TABLE test_table (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  date_col DATE
+)
+STORED AS PARQUET
+WITH HEADER ROW
+WITH ORDER (string_col ASC NULLS LAST, int_col ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet/test_table';
+
+# Check output plan, expect an "output_ordering" clause in the physical_plan 
-> ParquetExec:
+query TT
+EXPLAIN SELECT int_col, string_col
+FROM test_table
+ORDER BY string_col, int_col;
+----
+logical_plan
+Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST
+--TableScan: test_table projection=[int_col, string_col]
+physical_plan
+SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
+--ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]},
 projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS 
LAST, int_col@0 ASC NULLS LAST]
+
+# Add another file to the directory underlying test_table
+query ITID
+COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3)

Review Comment:
   Same as above, you want filter that provide deterministic result and data is 
sorted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to