This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 97e00efff3 #17411 Relax constraint that file sort order must only 
reference individual columns (#17419)
97e00efff3 is described below

commit 97e00efff3ef612624a1c1497269cfc387b44633
Author: Pepijn Van Eeckhoudt <[email protected]>
AuthorDate: Wed Oct 1 20:33:22 2025 +0200

    #17411 Relax constraint that file sort order must only reference individual 
columns (#17419)
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/catalog/src/stream.rs                   | 17 ++--
 datafusion/core/src/datasource/listing/table.rs    | 83 +++++++++++++------
 datafusion/datasource/src/file_scan_config.rs      | 42 +++-------
 datafusion/datasource/src/memory.rs                | 24 +-----
 datafusion/physical-expr/src/equivalence/class.rs  |  5 ++
 datafusion/physical-expr/src/equivalence/mod.rs    |  2 +-
 .../physical-expr/src/equivalence/projection.rs    | 94 +++++++++++++++++++++-
 datafusion/physical-expr/src/lib.rs                |  6 +-
 datafusion/physical-expr/src/physical_expr.rs      | 29 ++++++-
 datafusion/sqllogictest/data/composite_order.csv   |  8 ++
 datafusion/sqllogictest/test_files/order.slt       | 37 +++++++++
 11 files changed, 258 insertions(+), 89 deletions(-)

diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs
index 2d66ff4628..f4a2338b8e 100644
--- a/datafusion/catalog/src/stream.rs
+++ b/datafusion/catalog/src/stream.rs
@@ -34,7 +34,7 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
-use datafusion_physical_expr::create_ordering;
+use datafusion_physical_expr::create_lex_ordering;
 use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
 use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
@@ -321,17 +321,21 @@ impl TableProvider for StreamTable {
 
     async fn scan(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         projection: Option<&Vec<usize>>,
         _filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let projected_schema = match projection {
             Some(p) => {
-                let projected = self.0.source.schema().project(p)?;
-                create_ordering(&projected, &self.0.order)?
+                let projected = Arc::new(self.0.source.schema().project(p)?);
+                create_lex_ordering(&projected, &self.0.order, 
state.execution_props())?
             }
-            None => create_ordering(self.0.source.schema(), &self.0.order)?,
+            None => create_lex_ordering(
+                self.0.source.schema(),
+                &self.0.order,
+                state.execution_props(),
+            )?,
         };
 
         Ok(Arc::new(StreamingTableExec::try_new(
@@ -351,7 +355,8 @@ impl TableProvider for StreamTable {
         _insert_op: InsertOp,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let schema = self.0.source.schema();
-        let orders = create_ordering(schema, &self.0.order)?;
+        let orders =
+            create_lex_ordering(schema, &self.0.order, 
_state.execution_props())?;
         // It is sufficient to pass only one of the equivalent orderings:
         let ordering = orders.into_iter().next().map(Into::into);
 
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 08066d14d9..de58740bcd 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -23,7 +23,7 @@ use super::{
 };
 use crate::{
     datasource::file_format::{file_compression_type::FileCompressionType, 
FileFormat},
-    datasource::{create_ordering, physical_plan::FileSinkConfig},
+    datasource::physical_plan::FileSinkConfig,
     execution::context::SessionState,
 };
 use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
@@ -45,9 +45,11 @@ use datafusion_execution::{
     cache::{cache_manager::FileStatisticsCache, 
cache_unit::DefaultFileStatisticsCache},
     config::SessionConfig,
 };
+use datafusion_expr::execution_props::ExecutionProps;
 use datafusion_expr::{
     dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
 };
+use datafusion_physical_expr::create_lex_ordering;
 use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
@@ -55,6 +57,7 @@ use futures::{future, stream, Stream, StreamExt, 
TryStreamExt};
 use itertools::Itertools;
 use object_store::ObjectStore;
 use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
+
 /// Indicates the source of the schema for a [`ListingTable`]
 // PartialEq required for assert_eq! in tests
 #[derive(Debug, Clone, Copy, PartialEq, Default)]
@@ -1129,8 +1132,15 @@ impl ListingTable {
     }
 
     /// If file_sort_order is specified, creates the appropriate physical 
expressions
-    fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
-        create_ordering(&self.table_schema, &self.options.file_sort_order)
+    fn try_create_output_ordering(
+        &self,
+        execution_props: &ExecutionProps,
+    ) -> Result<Vec<LexOrdering>> {
+        create_lex_ordering(
+            &self.table_schema,
+            &self.options.file_sort_order,
+            execution_props,
+        )
     }
 }
 
@@ -1219,7 +1229,7 @@ impl TableProvider for ListingTable {
             return 
Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
         }
 
-        let output_ordering = self.try_create_output_ordering()?;
+        let output_ordering = 
self.try_create_output_ordering(state.execution_props())?;
         match state
             .config_options()
             .execution
@@ -1359,7 +1369,7 @@ impl TableProvider for ListingTable {
             file_extension: self.options().format.get_ext(),
         };
 
-        let orderings = self.try_create_output_ordering()?;
+        let orderings = 
self.try_create_output_ordering(state.execution_props())?;
         // It is sufficient to pass only one of the equivalent orderings:
         let order_requirements = orderings.into_iter().next().map(Into::into);
 
@@ -1587,6 +1597,7 @@ mod tests {
         SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
     };
     use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
+    use datafusion_physical_expr::expressions::binary;
     use datafusion_physical_expr::PhysicalSortExpr;
     use datafusion_physical_plan::{collect, ExecutionPlanProperties};
     use rstest::rstest;
@@ -1719,29 +1730,44 @@ mod tests {
 
         use crate::datasource::file_format::parquet::ParquetFormat;
         use datafusion_physical_plan::expressions::col as physical_col;
+        use datafusion_physical_plan::expressions::lit as physical_lit;
         use std::ops::Add;
 
         // (file_sort_order, expected_result)
         let cases = vec![
-            (vec![], Ok(Vec::<LexOrdering>::new())),
+            (
+                vec![],
+                Ok::<Vec<LexOrdering>, 
DataFusionError>(Vec::<LexOrdering>::new()),
+            ),
             // sort expr, but non column
             (
-                vec![vec![
-                    col("int_col").add(lit(1)).sort(true, true),
-                ]],
-                Err("Expected single column reference in sort_order[0][0], got 
int_col + Int32(1)"),
+                vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
+                Ok(vec![[PhysicalSortExpr {
+                    expr: binary(
+                        physical_col("int_col", &schema).unwrap(),
+                        Operator::Plus,
+                        physical_lit(1),
+                        &schema,
+                    )
+                    .unwrap(),
+                    options: SortOptions {
+                        descending: false,
+                        nulls_first: true,
+                    },
+                }]
+                .into()]),
             ),
             // ok with one column
             (
                 vec![vec![col("string_col").sort(true, false)]],
                 Ok(vec![[PhysicalSortExpr {
-                            expr: physical_col("string_col", &schema).unwrap(),
-                            options: SortOptions {
-                                descending: false,
-                                nulls_first: false,
-                            },
-                        }].into(),
-                ])
+                    expr: physical_col("string_col", &schema).unwrap(),
+                    options: SortOptions {
+                        descending: false,
+                        nulls_first: false,
+                    },
+                }]
+                .into()]),
             ),
             // ok with two columns, different options
             (
@@ -1750,14 +1776,18 @@ mod tests {
                     col("int_col").sort(false, true),
                 ]],
                 Ok(vec![[
-                            
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
-                                        .asc()
-                                        .nulls_last(),
-                            
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
-                                        .desc()
-                                        .nulls_first()
-                        ].into(),
-                ])
+                    PhysicalSortExpr::new_default(
+                        physical_col("string_col", &schema).unwrap(),
+                    )
+                    .asc()
+                    .nulls_last(),
+                    PhysicalSortExpr::new_default(
+                        physical_col("int_col", &schema).unwrap(),
+                    )
+                    .desc()
+                    .nulls_first(),
+                ]
+                .into()]),
             ),
         ];
 
@@ -1770,7 +1800,8 @@ mod tests {
 
             let table =
                 ListingTable::try_new(config.clone()).expect("Creating the 
table");
-            let ordering_result = table.try_create_output_ordering();
+            let ordering_result =
+                table.try_create_output_ordering(state.execution_props());
 
             match (expected_result, ordering_result) {
                 (Ok(expected), Ok(result)) => {
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index d1940402ee..79f0082d51 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -18,11 +18,6 @@
 //! [`FileScanConfig`] to configure scanning of possibly partitioned
 //! file sources.
 
-use std::{
-    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
-    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
-};
-
 use crate::file_groups::FileGroup;
 #[allow(unused_imports)]
 use crate::schema_adapter::SchemaAdapterFactory;
@@ -54,16 +49,21 @@ use datafusion_physical_expr::{expressions::Column, 
utils::reassign_expr_columns
 use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, 
Partitioning};
 use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
-use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::projection::ProjectionExpr;
 use datafusion_physical_plan::{
     display::{display_orderings, ProjectSchemaDisplay},
+    filter_pushdown::FilterPushdownPropagation,
     metrics::ExecutionPlanMetricsSet,
     projection::{all_alias_free_columns, new_projections_for_columns},
     DisplayAs, DisplayFormatType,
 };
+use std::{
+    any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
+    fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
+};
 
+use datafusion_physical_expr::equivalence::project_orderings;
 use datafusion_physical_plan::coop::cooperative;
 use datafusion_physical_plan::execution_plan::SchedulingType;
 use log::{debug, warn};
@@ -1371,30 +1371,11 @@ fn get_projected_output_ordering(
     base_config: &FileScanConfig,
     projected_schema: &SchemaRef,
 ) -> Vec<LexOrdering> {
-    let mut all_orderings = vec![];
-    for output_ordering in &base_config.output_ordering {
-        let mut new_ordering = vec![];
-        for PhysicalSortExpr { expr, options } in output_ordering.iter() {
-            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
-                let name = col.name();
-                if let Some((idx, _)) = 
projected_schema.column_with_name(name) {
-                    // Compute the new sort expression (with correct index) 
after projection:
-                    new_ordering.push(PhysicalSortExpr::new(
-                        Arc::new(Column::new(name, idx)),
-                        *options,
-                    ));
-                    continue;
-                }
-            }
-            // Cannot find expression in the projected_schema, stop iterating
-            // since rest of the orderings are violated
-            break;
-        }
-
-        let Some(new_ordering) = LexOrdering::new(new_ordering) else {
-            continue;
-        };
+    let projected_orderings =
+        project_orderings(&base_config.output_ordering, projected_schema);
 
+    let mut all_orderings = vec![];
+    for new_ordering in projected_orderings {
         // Check if any file groups are not sorted
         if base_config.file_groups.iter().any(|group| {
             if group.len() <= 1 {
@@ -1467,6 +1448,7 @@ mod tests {
     use datafusion_expr::{Operator, SortExpr};
     use datafusion_physical_expr::create_physical_sort_expr;
     use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
+    use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
 
     /// Returns the column names on the schema
     pub fn columns(schema: &Schema) -> Vec<String> {
diff --git a/datafusion/datasource/src/memory.rs 
b/datafusion/datasource/src/memory.rs
index 564033438f..eb55aa9b0b 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -30,10 +30,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions};
 use arrow::datatypes::{Schema, SchemaRef};
 use datafusion_common::{internal_err, plan_err, project_schema, Result, 
ScalarValue};
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::equivalence::{
-    OrderingEquivalenceClass, ProjectionMapping,
-};
-use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::equivalence::project_orderings;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
 use datafusion_physical_plan::memory::MemoryStream;
@@ -433,22 +430,9 @@ impl MemorySourceConfig {
         }
 
         // If there is a projection on the source, we also need to project 
orderings
-        if let Some(projection) = &self.projection {
-            let base_schema = self.original_schema();
-            let proj_exprs = projection.iter().map(|idx| {
-                let name = base_schema.field(*idx).name();
-                (Arc::new(Column::new(name, *idx)) as _, name.to_string())
-            });
-            let projection_mapping =
-                ProjectionMapping::try_new(proj_exprs, &base_schema)?;
-            let base_eqp = EquivalenceProperties::new_with_orderings(
-                Arc::clone(&base_schema),
-                sort_information,
-            );
-            let proj_eqp =
-                base_eqp.project(&projection_mapping, 
Arc::clone(&self.projected_schema));
-            let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
-            sort_information = oeq_class.into();
+        if self.projection.is_some() {
+            sort_information =
+                project_orderings(&sort_information, &self.projected_schema);
         }
 
         self.sort_information = sort_information;
diff --git a/datafusion/physical-expr/src/equivalence/class.rs 
b/datafusion/physical-expr/src/equivalence/class.rs
index 8af6f3be03..66ce77ef41 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -590,6 +590,11 @@ impl EquivalenceGroup {
         aug_mapping: &AugmentedMapping,
         expr: &Arc<dyn PhysicalExpr>,
     ) -> Option<Arc<dyn PhysicalExpr>> {
+        // Literals don't need to be projected
+        if expr.as_any().downcast_ref::<Literal>().is_some() {
+            return Some(Arc::clone(expr));
+        }
+
         // The given expression is not inside the mapping, so we try to project
         // indirectly using equivalence classes.
         for (targets, eq_class) in aug_mapping.values() {
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs 
b/datafusion/physical-expr/src/equivalence/mod.rs
index ecb73be256..bcc6835e2f 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -30,7 +30,7 @@ mod properties;
 
 pub use class::{AcrossPartitions, ConstExpr, EquivalenceClass, 
EquivalenceGroup};
 pub use ordering::OrderingEquivalenceClass;
-pub use projection::ProjectionMapping;
+pub use projection::{project_ordering, project_orderings, ProjectionMapping};
 pub use properties::{
     calculate_union, join_equivalence_properties, EquivalenceProperties,
 };
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs 
b/datafusion/physical-expr/src/equivalence/projection.rs
index 6fe5605229..a4ed8187cf 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -23,8 +23,9 @@ use crate::PhysicalExpr;
 
 use arrow::datatypes::SchemaRef;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{internal_err, Result};
+use datafusion_common::{internal_err, plan_err, Result};
 
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
 use indexmap::IndexMap;
 
 /// Stores target expressions, along with their indices, that associate with a
@@ -156,6 +157,97 @@ impl FromIterator<(Arc<dyn PhysicalExpr>, 
ProjectionTargets)> for ProjectionMapp
     }
 }
 
+/// Projects a slice of [LexOrdering]s onto the given schema.
+///
+/// This is a convenience wrapper that applies [project_ordering] to each
+/// input ordering and collects the successful projections:
+/// - For each input ordering, the result of [project_ordering] is appended to
+///   the output if it is `Some(...)`.
+/// - Order is preserved and no deduplication is attempted.
+/// - If none of the input orderings can be projected, an empty `Vec` is
+///   returned.
+///
+/// See [project_ordering] for the semantics of projecting a single
+/// [LexOrdering].
+pub fn project_orderings(
+    orderings: &[LexOrdering],
+    schema: &SchemaRef,
+) -> Vec<LexOrdering> {
+    let mut projected_orderings = vec![];
+
+    for ordering in orderings {
+        projected_orderings.extend(project_ordering(ordering, schema));
+    }
+
+    projected_orderings
+}
+
+/// Projects a single [LexOrdering] onto the given schema.
+///
+/// This function attempts to rewrite every [PhysicalSortExpr] in the provided
+/// [LexOrdering] so that any [Column] expressions point at the correct field
+/// indices in `schema`.
+///
+/// Key details:
+/// - Columns are matched by name, not by index. The index of each matched
+///   column is looked up with 
[Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new
+///   [Column] with the correct [index](Column::index) is substituted.
+/// - If an expression references a column name that does not exist in
+///   `schema`, projection of the current ordering stops and only the already
+///   rewritten prefix is kept. This models the fact that a lexicographical
+///   ordering remains valid for any leading prefix whose expressions are
+///   present in the projected schema.
+/// - If no expressions can be projected (i.e. the first one is missing), the
+///   function returns `None`.
+///
+/// Return value:
+/// - `Some(LexOrdering)` if at least one sort expression could be projected.
+///   The returned ordering may be a strict prefix of the input ordering.
+/// - `None` if no part of the ordering can be projected onto `schema`.
+///
+/// Example
+///
+/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the 
projected
+/// schema only contains b and not a. The result will be `Some([col("a@0")])`. 
In other
+/// words, the column reference is reindexed to match the projected schema.
+/// If neither a nor b is present, the result will be None.
+pub fn project_ordering(
+    ordering: &LexOrdering,
+    schema: &SchemaRef,
+) -> Option<LexOrdering> {
+    let mut projected_exprs = vec![];
+    for PhysicalSortExpr { expr, options } in ordering.iter() {
+        let transformed = Arc::clone(expr).transform_up(|expr| {
+            let Some(col) = expr.as_any().downcast_ref::<Column>() else {
+                return Ok(Transformed::no(expr));
+            };
+
+            let name = col.name();
+            if let Some((idx, _)) = schema.column_with_name(name) {
+                // Compute the new column expression (with correct index) 
after projection:
+                Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
+            } else {
+                // Cannot find expression in the projected_schema,
+                // signal this using an Err result
+                plan_err!("")
+            }
+        });
+
+        match transformed {
+            Ok(transformed) => {
+                projected_exprs.push(PhysicalSortExpr::new(transformed.data, 
*options));
+            }
+            Err(_) => {
+                // Err result indicates an expression could not be found in the
+                // projected_schema, stop iterating since rest of the 
orderings are violated
+                break;
+            }
+        }
+    }
+
+    LexOrdering::new(projected_exprs)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 50a8e109ae..468591d34d 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,9 +56,9 @@ pub use equivalence::{
 };
 pub use partitioning::{Distribution, Partitioning};
 pub use physical_expr::{
-    add_offset_to_expr, add_offset_to_physical_sort_exprs, create_ordering,
-    create_physical_sort_expr, create_physical_sort_exprs, 
physical_exprs_bag_equal,
-    physical_exprs_contains, physical_exprs_equal,
+    add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering,
+    create_ordering, create_physical_sort_expr, create_physical_sort_exprs,
+    physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
 };
 
 pub use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, 
PhysicalExprRef};
diff --git a/datafusion/physical-expr/src/physical_expr.rs 
b/datafusion/physical-expr/src/physical_expr.rs
index 3f063d7a03..2cc484ec6a 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -21,7 +21,7 @@ use crate::expressions::{self, Column};
 use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
 
 use arrow::compute::SortOptions;
-use arrow::datatypes::Schema;
+use arrow::datatypes::{Schema, SchemaRef};
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{plan_err, Result};
 use datafusion_common::{DFSchema, HashMap};
@@ -29,7 +29,6 @@ use datafusion_expr::execution_props::ExecutionProps;
 use datafusion_expr::{Expr, SortExpr};
 
 use itertools::izip;
-
 // Exports:
 pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
@@ -163,6 +162,32 @@ pub fn create_ordering(
     Ok(all_sort_orders)
 }
 
+/// Creates a vector of [LexOrdering] from a vector of logical expression
+pub fn create_lex_ordering(
+    schema: &SchemaRef,
+    sort_order: &[Vec<SortExpr>],
+    execution_props: &ExecutionProps,
+) -> Result<Vec<LexOrdering>> {
+    // Try the fast path that only supports column references first
+    // This avoids creating a DFSchema
+    if let Ok(ordering) = create_ordering(schema, sort_order) {
+        return Ok(ordering);
+    }
+
+    let df_schema = DFSchema::try_from(Arc::clone(schema))?;
+
+    let mut all_sort_orders = vec![];
+
+    for exprs in sort_order.iter() {
+        all_sort_orders.extend(LexOrdering::new(create_physical_sort_exprs(
+            exprs,
+            &df_schema,
+            execution_props,
+        )?));
+    }
+    Ok(all_sort_orders)
+}
+
 /// Create a physical sort expression from a logical expression
 pub fn create_physical_sort_expr(
     e: &SortExpr,
diff --git a/datafusion/sqllogictest/data/composite_order.csv 
b/datafusion/sqllogictest/data/composite_order.csv
new file mode 100644
index 0000000000..b2c5e881bd
--- /dev/null
+++ b/datafusion/sqllogictest/data/composite_order.csv
@@ -0,0 +1,8 @@
+a,b
+1,0
+0,2
+1,2
+0,4
+5,0
+3,3
+4,3
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index 1050b59613..04a7615c76 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -1517,3 +1517,40 @@ SELECT address, zip FROM addresses ORDER BY ALL;
 111 Duck Duck Goose Ln 11111
 111 Duck Duck Goose Ln 11111-0001
 123 Quack Blvd 11111
+
+# Create a table with an order clause that's not a simple column reference
+statement ok
+CREATE EXTERNAL TABLE ordered (
+  a  BIGINT NOT NULL,
+  b  BIGINT NOT NULL
+)
+STORED AS CSV
+LOCATION 'data/composite_order.csv'
+OPTIONS ('format.has_header' 'true')
+WITH ORDER (a + b);
+
+# Simple query should be just a table scan
+query TT
+EXPLAIN SELECT * from ordered;
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, 
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, 
has_header=true
+
+# Query ordered by the declared order should be just a table scan
+query TT
+EXPLAIN SELECT * from ordered ORDER BY (a + b);
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, 
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, 
has_header=true
+
+# Order equivalence handling should make this query a simple table scan
+query TT
+EXPLAIN SELECT * from ordered ORDER BY -(a + b) desc nulls last;
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, 
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, 
has_header=true
+
+# Ordering by another column requires a sort
+query TT
+EXPLAIN SELECT * from ordered ORDER BY a;
+----
+physical_plan
+01)SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]}, 
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv, 
has_header=true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to