adriangb commented on code in PR #18817:
URL: https://github.com/apache/datafusion/pull/18817#discussion_r2582071371
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -485,6 +490,15 @@ impl ParquetSource {
)),
}
}
+
+ pub fn with_reverse_scan(mut self, reverse_scan: bool) -> Self {
+ self.reverse_scan = reverse_scan;
+ self
+ }
Review Comment:
Are these used from production code (the optimizers, etc.) or only from our
tests?
##########
datafusion/common/src/config.rs:
##########
@@ -831,6 +831,15 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2
+
+ /// Enable sort pushdown optimization for sorted Parquet files.
+ /// Currently, this optimization only has reverse order support.
+ /// When a query requires ordering that can be satisfied by reversing
+ /// the file's natural ordering, row groups and batches are read in
+ /// reverse order to eliminate sort operations.
+ /// Note: This buffers one row group at a time (typically ~128MB).
+ /// Default: true
Review Comment:
Can this support non-reverse cases where the query order is the same as the
file order? i.e. can we eliminate sorts in that simpler case as well? Or does
that already happen / was already implemented?
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -766,6 +766,131 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ fn try_pushdown_sort(
+ &self,
+ order: &[PhysicalSortExpr],
+ ) -> Result<Option<Arc<dyn DataSource>>> {
+ let current_ordering = match self.output_ordering.first() {
Review Comment:
Nit for a bigger picture plan: I think we should re-arrange things such that
the ordering of each file is recorded in `PartitionedFile` and any known
ordering of groups in `FileGroup`. Then `FileScanConfig` should calculate it's
output ordering from that instead of being given one. And if sort pushdown is
requested then `FileScanConfig` can:
1. Try to re-arrange the order by reversing groups, by re-creating groups
entirely using stats. This is enough for `Inexact` in
https://github.com/apache/datafusion/pull/18817/files#r2582092389.
2. Try to push down the preferred order to the `FileSource` which determines
if the order of reading a given file can be reversed. Maybe it needs a
reference to the files since it has to handle row group order i.e. Parquet
specific stuff? But if it is able to reverse the order of the scans then the
whole result becomes `Exact`.
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -806,4 +844,87 @@ mod tests {
// same value. but filter() call Arc::clone internally
assert_eq!(parquet_source.predicate(),
parquet_source.filter().as_ref());
}
+
+ #[test]
+ fn test_reverse_scan_default_value() {
+ use arrow::datatypes::Schema;
+
+ let schema = Arc::new(Schema::empty());
+ let source = ParquetSource::new(schema);
+
+ assert!(!source.reverse_scan());
Review Comment:
We could just access the private field here since it's in the same module.
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -788,6 +803,29 @@ impl FileSource for ParquetSource {
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
+
+ /// When push down to parquet source of a sort operation is possible,
+ /// create a new ParquetSource with reverse_scan enabled.
+ /// TODO support more policies in addition to reversing the scan.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
+ // Note: We ignore the specific `order` parameter here because the
decision
+ // about whether we can reverse is made at the FileScanConfig level.
+ // This method creates a reversed version of the current ParquetSource,
+ // and the FileScanConfig will reverse both the file list and the
declared ordering.
+
+ // Clone the entire source to preserve ALL configuration including:
+ // - projection (CRITICAL: prevents schema mismatch)
+ // - predicate
+ // - batch_size
+ // - table_parquet_options
+ // - all other settings
+ let new_source = self.clone().with_reverse_scan(true);
Review Comment:
If this is the only place `with_reverse_scan` is used I suggest we set the
private field directly to avoid polluting the public API.
##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -682,6 +684,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
+
+ /// Try to create a new execution plan that satisfies the given sort
ordering.
+ ///
+ /// Default implementation returns `Ok(None)`.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Review Comment:
I similarly think there should be the option to return a new plan that is
optimized for the order but doesn't satisfy it enough to remove the sort
(https://github.com/apache/datafusion/pull/18817/files#r2582092389)
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -788,6 +803,29 @@ impl FileSource for ParquetSource {
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
+
+ /// When push down to parquet source of a sort operation is possible,
+ /// create a new ParquetSource with reverse_scan enabled.
+ /// TODO support more policies in addition to reversing the scan.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
+ // Note: We ignore the specific `order` parameter here because the
decision
+ // about whether we can reverse is made at the FileScanConfig level.
+ // This method creates a reversed version of the current ParquetSource,
+ // and the FileScanConfig will reverse both the file list and the
declared ordering.
Review Comment:
Just to clarify: since `FileScanConfig` is called first we are assuming that
if we were called the intention is to reverse the order? I would rather verify
this / decouple these two things. That is I'd rather parse the sort order here
and do whatever we need to do internally to satisfy it (or respond `None` if we
can't) instead of relying on `FileScanConfig` to do it for us.
##########
datafusion/datasource/src/file.rs:
##########
@@ -129,6 +130,29 @@ pub trait FileSource: Send + Sync {
))
}
+ /// Try to create a new FileSource that can produce data in the specified
sort order.
+ ///
+ /// This allows file format implementations to optimize based on the
required sort order.
+ /// For example:
+ /// - ParquetSource can reverse scan direction
+ /// - Future implementations might reorder row groups or use native indexes
+ ///
+ /// # Arguments
+ /// * `order` - The desired output ordering
+ ///
+ /// # Returns
+ /// * `Ok(Some(source))` - Created a source that can satisfy the ordering
+ /// * `Ok(None)` - Cannot optimize for this ordering
+ /// * `Err(e)` - Error occurred
+ ///
+ /// Default implementation returns `Ok(None)`.
+ fn try_pushdown_sort(
+ &self,
+ _order: &[PhysicalSortExpr],
+ ) -> Result<Option<Arc<dyn FileSource>>> {
Review Comment:
I think it might be nice to make this an enum instead of an option.
Specifically:
```rust
pub enum SortOrderPushdownResult<T> {
Exact { inner: T },
Inexact { inner: T },
Unsupported,
}
```
The idea being that `Inexact` means "I changed myself to better satisfy the
join order, but cannot guarantee it's perfectly sorted (e.g. only re-arranged
files and row groups based on stats but not the actual data stream).
This matters because e.g. `Inexact` would already be a *huge* win for TopK +
dynamic filter pushdown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]