alamb commented on code in PR #22607:
URL: https://github.com/apache/datafusion/pull/22607#discussion_r3323904683


##########
datafusion/sqllogictest/src/test_context.rs:
##########
@@ -286,6 +303,212 @@ fn register_strict_schema_provider(ctx: &SessionContext) {
     );
 }
 
+// 
==============================================================================
+// Range Partitioned Table (sqllogictest-only)
+// 
==============================================================================
+
+#[derive(Debug)]
+struct RangePartitionedTable {
+    schema: SchemaRef,
+    partitions: Vec<Vec<RecordBatch>>,
+    range_column_index: usize,
+    split_points: Vec<SplitPoint>,
+}
+
+#[async_trait]
+impl TableProvider for RangePartitionedTable {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let projected_schema = project_schema(&self.schema, projection)?;
+        let mut source = MemorySourceConfig::try_new(
+            &self.partitions,
+            Arc::clone(&self.schema),
+            projection.cloned(),
+        )?;
+        source = 
source.with_show_sizes(state.config_options().explain.show_sizes);
+
+        let output_partitioning =
+            self.output_partitioning(projection, &projected_schema)?;
+        let source = RangePartitionedSource {
+            inner: source,
+            output_partitioning,
+        };
+
+        Ok(DataSourceExec::from_data_source(source))
+    }
+}
+
+impl RangePartitionedTable {
+    fn output_partitioning(

Review Comment:
   I didn't realize it would take so much ceremony to get partitioned data (aka 
create an entire DataSource/TableProvider) and I agree with @gabotechs  that if 
we go down this approach it will be hard to generate all the possible cases we 
would like to test. 
   
   SO I would suggest we start by integrate pre-defined partitioning in higher 
level APIs, starating with `FileScanConfig`,  which I think will be needed 
eventually to use this feature
   
   I dug around a bit and it seems like it will be straightforward to make  
`FileScanConfig::with_partitioned_by_file_group` more general
   
https://github.com/apache/datafusion/blob/04c01bba3220becdacdca6772784a3c226047063/datafusion/datasource/src/file_scan_config/mod.rs#L494-L505
   
   What I suggest we do (could be a follow on PR)
   1. Update FileScanConfig so it specifies a predefined output partitioning 
(`output_partitioning: Option<Partitioning>`) rather than a bool
   2. Deprecate `FileScanConfig::with_partitoned_by_file_groups`
   3. Add a new `FileScanConfig::with_output_partitioning` similar to 
`FileScanConfig::with_output_ordering` 
   4. Use the DataSourceExec::from(...) for that config
   
   That will save a lot of boiler plate in this setup, and I think you'll need 
the more general form to take advantage of range partitioning externally (aka 
it won't be wasted code)
   
   
   Eventually, I think we should be targeting  
[ListingTable](https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html)
 s that declared it was RangePartitioned via ListingOptions (and thus 
eventually via SQL) . 
   
   However, it seems like ListingTable/ListingOptions don't have the output 
hooks yet and we need to buid there incrementally
   
https://github.com/apache/datafusion/blob/4c909bafc5c50749884fdd80a06235d7bd72dbde/datafusion/catalog-listing/src/options.rs#L32-L31
   
   



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -171,6 +171,10 @@ impl Display for Partitioning {
 /// Values equal to split point `i` belong to partition `i + 1`, so interior
 /// partitions are lower-inclusive and upper-exclusive.
 ///
+/// If a source declares range partitioning, it is responsible for placing each

Review Comment:
   this is true for any user specified data properties (e.g. listing tables in 
directories, existing sort orders, etc) -- I am just saying it might be good to 
clarify like
   
   "Like other user specified data properties such as sortedness, if a source 
declares ...."



##########
datafusion/sqllogictest/src/test_context.rs:
##########
@@ -286,6 +303,212 @@ fn register_strict_schema_provider(ctx: &SessionContext) {
     );
 }
 
+// 
==============================================================================

Review Comment:
   if we really need a few hundred lines of setip, maybe we can put it into a 
module like `datafusion/sqllogictest/src/test_context/range_partitioning.rs`
   
   However, I have suggestions below that I think could make this substantially 
smaller



##########
datafusion/sqllogictest/test_files/range_partitioning.slt:
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# The sqllogictest harness registers range_partitioned(range_key, 
non_range_key, value)
+# as an in-memory source with four physical source partitions:
+#
+# partition 0: range_key in [-inf, 10), rows (1, 1, 10), (5, 2, 50)
+# partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150)
+# partition 2: range_key in [20, 30), rows (20, 1, 200), (25, 2, 250)
+# partition 3: range_key in [30, +inf), rows (30, 1, 300), (35, 2, 350)
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+##########
+# TEST 1: Aggregate on Range Partition Column
+# Scanning range_key preserves source Range partitioning metadata.
+# Planning still inserts Hash repartitioning today; later optimizer PRs can
+# use this baseline to show when the repartition is removed.
+##########
+
+query TT
+EXPLAIN SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key;
+----
+physical_plan
+01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], 
aggr=[sum(range_partitioned.value)]
+02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4
+03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], 
aggr=[sum(range_partitioned.value)]
+04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], 
output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4)
+
+query II
+SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key ORDER 
BY range_key;
+----
+1 10
+5 50
+10 100
+15 150
+20 200
+25 250
+30 300
+35 350
+
+
+##########
+# TEST 2: Aggregate on Non-Range Column
+# Projecting away range_key means the scan output no longer contains the
+# expression needed to describe range partitioning, so it reports
+# UnknownPartitioning with the same partition count.
+##########
+
+query TT
+EXPLAIN SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY 
non_range_key;
+----
+physical_plan
+01)AggregateExec: mode=FinalPartitioned, gby=[non_range_key@0 as 
non_range_key], aggr=[sum(range_partitioned.value)]
+02)--RepartitionExec: partitioning=Hash([non_range_key@0], 4), 
input_partitions=4
+03)----AggregateExec: mode=Partial, gby=[non_range_key@0 as non_range_key], 
aggr=[sum(range_partitioned.value)]
+04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], 
output_partitioning=UnknownPartitioning(4)
+
+query II
+SELECT non_range_key, SUM(value) FROM range_partitioned GROUP BY non_range_key 
ORDER BY non_range_key;
+----
+1 610
+2 800
+
+
+##########
+# TEST 3: Join on Range Partition Column
+# Both inputs expose Range partitioning on range_key. Join planning currently
+# reaches the unsupported Range output-partitioning path; later optimizer PRs
+# can replace this baseline with a successful plan and result test.
+##########
+
+query error This feature is not implemented: Join output partitioning with 
range partitioning is not implemented
+SELECT l.range_key, l.value, r.value
+FROM range_partitioned l
+JOIN range_partitioned r ON l.range_key = r.range_key;
+
+##########
+# TEST 4: Union of Range Partitioned Inputs
+# Each input exposes Range partitioning on range_key. This records current
+# UNION ALL behavior before later PRs decide whether compatible range inputs 
can
+# preserve Range partitioning across the union.
+##########
+
+query TT
+EXPLAIN SELECT range_key, value FROM range_partitioned
+UNION ALL
+SELECT range_key, value FROM range_partitioned;
+----
+physical_plan
+01)UnionExec
+02)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], 
output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4)
+03)--DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], 
output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4)

Review Comment:
   love to see the output partitioning here



##########
datafusion/sqllogictest/test_files/range_partitioning.slt:
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# The sqllogictest harness registers range_partitioned(range_key, 
non_range_key, value)
+# as an in-memory source with four physical source partitions:
+#
+# partition 0: range_key in [1, 10), rows (1, 1, 10), (5, 2, 50)
+# partition 1: range_key in [10, 20), rows (10, 1, 100), (15, 2, 150)
+# partition 2: range_key in [20, 30), rows (20, 1, 200), (25, 2, 250)
+# partition 3: range_key in [30, ...), rows (30, 1, 300), (35, 2, 350)
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+##########
+# TEST 1: Aggregate on Range Partition Column
+# Scanning range_key preserves source Range partitioning metadata.
+# Planning still inserts Hash repartitioning today; later optimizer PRs can
+# use this baseline to show when the repartition is removed.
+##########
+
+query TT
+EXPLAIN SELECT range_key, SUM(value) FROM range_partitioned GROUP BY range_key;
+----
+physical_plan
+01)AggregateExec: mode=FinalPartitioned, gby=[range_key@0 as range_key], 
aggr=[sum(range_partitioned.value)]
+02)--RepartitionExec: partitioning=Hash([range_key@0], 4), input_partitions=4
+03)----AggregateExec: mode=Partial, gby=[range_key@0 as range_key], 
aggr=[sum(range_partitioned.value)]
+04)------DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], 
output_partitioning=Range([range_key@0 ASC], [(10), (20), (30)], 4)
+

Review Comment:
   Maybe we could change the display to show the first/last paritions as open 
(rather than explicitly naming inf)
   
   instead of
   >  ([-inf, 10), [10, 20), [20, 30), [30, +inf]).
   
   Something like
   
   >  ([..., 10), [10, 20), [20, 30), [30, ...]).
   
   So don't actually change the implementation just how it was displayed



##########
datafusion/sqllogictest/src/test_context.rs:
##########
@@ -286,6 +303,212 @@ fn register_strict_schema_provider(ctx: &SessionContext) {
     );
 }
 
+// 
==============================================================================
+// Range Partitioned Table (sqllogictest-only)
+// 
==============================================================================
+
+#[derive(Debug)]
+struct RangePartitionedTable {
+    schema: SchemaRef,
+    partitions: Vec<Vec<RecordBatch>>,
+    range_column_index: usize,

Review Comment:
   I like starting simple -- if you put that rationale in the comments it might 
help reduce the number of times you have to repeat it 😆 
   something like
   
   ```rust
   /// Simple range partitioned table for testing, before
   /// declaring such tables is supported via SQL
   #[derive(Debug)]
   struct RangePartitionedTable {
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to