adriangb commented on code in PR #18817:
URL: https://github.com/apache/datafusion/pull/18817#discussion_r2582071371


##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -485,6 +490,15 @@ impl ParquetSource {
             )),
         }
     }
+
+    pub fn with_reverse_scan(mut self, reverse_scan: bool) -> Self {
+        self.reverse_scan = reverse_scan;
+        self
+    }

Review Comment:
   Are these used from production code (the optimizers, etc.) or only from our 
tests?



##########
datafusion/common/src/config.rs:
##########
@@ -831,6 +831,15 @@ config_namespace! {
         /// writing out already in-memory data, such as from a cached
         /// data frame.
         pub maximum_buffered_record_batches_per_stream: usize, default = 2
+
+        /// Enable sort pushdown optimization for sorted Parquet files.
+        /// Currently, this optimization only has reverse order support.
+        /// When a query requires ordering that can be satisfied by reversing
+        /// the file's natural ordering, row groups and batches are read in
+        /// reverse order to eliminate sort operations.
+        /// Note: This buffers one row group at a time (typically ~128MB).
+        /// Default: true

Review Comment:
   Can this support non-reverse cases where the query order is the same as the 
file order? i.e. can we eliminate sorts in that simpler case as well? Or does 
that already happen / was already implemented?



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -766,6 +766,131 @@ impl DataSource for FileScanConfig {
             }
         }
     }
+
+    fn try_pushdown_sort(
+        &self,
+        order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn DataSource>>> {
+        let current_ordering = match self.output_ordering.first() {

Review Comment:
   Nit for a bigger picture plan: I think we should re-arrange things such that 
the ordering of each file is recorded in `PartitionedFile` and any known 
ordering of groups in `FileGroup`. Then `FileScanConfig` should calculate it's 
output ordering from that instead of being given one. And if sort pushdown is 
requested then `FileScanConfig` can:
   1. Try to re-arrange the order by reversing groups, by re-creating groups 
entirely using stats. This is enough for `Inexact` in 
https://github.com/apache/datafusion/pull/18817/files#r2582092389.
   2. Try to push down the preferred order to the `FileSource` which determines 
if the order of reading a given file can be reversed. Maybe it needs a 
reference to the files since it has to handle row group order i.e. Parquet 
specific stuff? But if it is able to reverse the order of the scans then the 
whole result becomes `Exact`.



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -806,4 +844,87 @@ mod tests {
         // same value. but filter() call Arc::clone internally
         assert_eq!(parquet_source.predicate(), 
parquet_source.filter().as_ref());
     }
+
+    #[test]
+    fn test_reverse_scan_default_value() {
+        use arrow::datatypes::Schema;
+
+        let schema = Arc::new(Schema::empty());
+        let source = ParquetSource::new(schema);
+
+        assert!(!source.reverse_scan());

Review Comment:
   We could just access the private field here since it's in the same module.



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -788,6 +803,29 @@ impl FileSource for ParquetSource {
     fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
         self.schema_adapter_factory.clone()
     }
+
+    /// When push down to parquet source of a sort operation is possible,
+    /// create a new ParquetSource with reverse_scan enabled.
+    /// TODO support more policies in addition to reversing the scan.
+    fn try_pushdown_sort(
+        &self,
+        _order: &[PhysicalSortExpr],
+    ) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
+        // Note: We ignore the specific `order` parameter here because the 
decision
+        // about whether we can reverse is made at the FileScanConfig level.
+        // This method creates a reversed version of the current ParquetSource,
+        // and the FileScanConfig will reverse both the file list and the 
declared ordering.
+
+        // Clone the entire source to preserve ALL configuration including:
+        // - projection (CRITICAL: prevents schema mismatch)
+        // - predicate
+        // - batch_size
+        // - table_parquet_options
+        // - all other settings
+        let new_source = self.clone().with_reverse_scan(true);

Review Comment:
   If this is the only place `with_reverse_scan` is used I suggest we set the 
private field directly to avoid polluting the public API.



##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -682,6 +684,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ) -> Option<Arc<dyn ExecutionPlan>> {
         None
     }
+
+    /// Try to create a new execution plan that satisfies the given sort 
ordering.
+    ///
+    /// Default implementation returns `Ok(None)`.
+    fn try_pushdown_sort(
+        &self,
+        _order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {

Review Comment:
   I similarly think there should be the option to return a new plan that is 
optimized for the order but doesn't satisfy it enough to remove the sort 
(https://github.com/apache/datafusion/pull/18817/files#r2582092389)



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -788,6 +803,29 @@ impl FileSource for ParquetSource {
     fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
         self.schema_adapter_factory.clone()
     }
+
+    /// When push down to parquet source of a sort operation is possible,
+    /// create a new ParquetSource with reverse_scan enabled.
+    /// TODO support more policies in addition to reversing the scan.
+    fn try_pushdown_sort(
+        &self,
+        _order: &[PhysicalSortExpr],
+    ) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
+        // Note: We ignore the specific `order` parameter here because the 
decision
+        // about whether we can reverse is made at the FileScanConfig level.
+        // This method creates a reversed version of the current ParquetSource,
+        // and the FileScanConfig will reverse both the file list and the 
declared ordering.

Review Comment:
   Just to clarify: since `FileScanConfig` is called first we are assuming that 
if we were called the intention is to reverse the order? I would rather verify 
this / decouple these two things. That is I'd rather parse the sort order here 
and do whatever we need to do internally to satisfy it (or respond `None` if we 
can't) instead of relying on `FileScanConfig` to do it for us.



##########
datafusion/datasource/src/file.rs:
##########
@@ -129,6 +130,29 @@ pub trait FileSource: Send + Sync {
         ))
     }
 
+    /// Try to create a new FileSource that can produce data in the specified 
sort order.
+    ///
+    /// This allows file format implementations to optimize based on the 
required sort order.
+    /// For example:
+    /// - ParquetSource can reverse scan direction
+    /// - Future implementations might reorder row groups or use native indexes
+    ///
+    /// # Arguments
+    /// * `order` - The desired output ordering
+    ///
+    /// # Returns
+    /// * `Ok(Some(source))` - Created a source that can satisfy the ordering
+    /// * `Ok(None)` - Cannot optimize for this ordering
+    /// * `Err(e)` - Error occurred
+    ///
+    /// Default implementation returns `Ok(None)`.
+    fn try_pushdown_sort(
+        &self,
+        _order: &[PhysicalSortExpr],
+    ) -> Result<Option<Arc<dyn FileSource>>> {

Review Comment:
   I think it might be nice to make this an enum instead of an option. 
Specifically:
   
   ```rust
   pub enum SortOrderPushdownResult<T> {
       Exact { inner: T },
       Inexact { inner: T },
       Unsupported,
   }
   ```
   
   The idea being that `Inexact` means "I changed myself to better satisfy the 
join order, but cannot guarantee it's perfectly sorted (e.g. only re-arranged 
files and row groups based on stats but not the actual data stream).
   
   This matters because e.g. `Inexact` would already be a *huge* win for TopK + 
dynamic filter pushdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to