This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 97e00efff3 #17411 Relax constraint that file sort order must only
reference individual columns (#17419)
97e00efff3 is described below
commit 97e00efff3ef612624a1c1497269cfc387b44633
Author: Pepijn Van Eeckhoudt <[email protected]>
AuthorDate: Wed Oct 1 20:33:22 2025 +0200
#17411 Relax constraint that file sort order must only reference individual
columns (#17419)
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/catalog/src/stream.rs | 17 ++--
datafusion/core/src/datasource/listing/table.rs | 83 +++++++++++++------
datafusion/datasource/src/file_scan_config.rs | 42 +++-------
datafusion/datasource/src/memory.rs | 24 +-----
datafusion/physical-expr/src/equivalence/class.rs | 5 ++
datafusion/physical-expr/src/equivalence/mod.rs | 2 +-
.../physical-expr/src/equivalence/projection.rs | 94 +++++++++++++++++++++-
datafusion/physical-expr/src/lib.rs | 6 +-
datafusion/physical-expr/src/physical_expr.rs | 29 ++++++-
datafusion/sqllogictest/data/composite_order.csv | 8 ++
datafusion/sqllogictest/test_files/order.slt | 37 +++++++++
11 files changed, 258 insertions(+), 89 deletions(-)
diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs
index 2d66ff4628..f4a2338b8e 100644
--- a/datafusion/catalog/src/stream.rs
+++ b/datafusion/catalog/src/stream.rs
@@ -34,7 +34,7 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
-use datafusion_physical_expr::create_ordering;
+use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
@@ -321,17 +321,21 @@ impl TableProvider for StreamTable {
async fn scan(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let projected_schema = match projection {
Some(p) => {
- let projected = self.0.source.schema().project(p)?;
- create_ordering(&projected, &self.0.order)?
+ let projected = Arc::new(self.0.source.schema().project(p)?);
+ create_lex_ordering(&projected, &self.0.order,
state.execution_props())?
}
- None => create_ordering(self.0.source.schema(), &self.0.order)?,
+ None => create_lex_ordering(
+ self.0.source.schema(),
+ &self.0.order,
+ state.execution_props(),
+ )?,
};
Ok(Arc::new(StreamingTableExec::try_new(
@@ -351,7 +355,8 @@ impl TableProvider for StreamTable {
_insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = self.0.source.schema();
- let orders = create_ordering(schema, &self.0.order)?;
+ let orders =
+ create_lex_ordering(schema, &self.0.order,
_state.execution_props())?;
// It is sufficient to pass only one of the equivalent orderings:
let ordering = orders.into_iter().next().map(Into::into);
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 08066d14d9..de58740bcd 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -23,7 +23,7 @@ use super::{
};
use crate::{
datasource::file_format::{file_compression_type::FileCompressionType,
FileFormat},
- datasource::{create_ordering, physical_plan::FileSinkConfig},
+ datasource::physical_plan::FileSinkConfig,
execution::context::SessionState,
};
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
@@ -45,9 +45,11 @@ use datafusion_execution::{
cache::{cache_manager::FileStatisticsCache,
cache_unit::DefaultFileStatisticsCache},
config::SessionConfig,
};
+use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
};
+use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
@@ -55,6 +57,7 @@ use futures::{future, stream, Stream, StreamExt,
TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
+
/// Indicates the source of the schema for a [`ListingTable`]
// PartialEq required for assert_eq! in tests
#[derive(Debug, Clone, Copy, PartialEq, Default)]
@@ -1129,8 +1132,15 @@ impl ListingTable {
}
/// If file_sort_order is specified, creates the appropriate physical
expressions
- fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
- create_ordering(&self.table_schema, &self.options.file_sort_order)
+ fn try_create_output_ordering(
+ &self,
+ execution_props: &ExecutionProps,
+ ) -> Result<Vec<LexOrdering>> {
+ create_lex_ordering(
+ &self.table_schema,
+ &self.options.file_sort_order,
+ execution_props,
+ )
}
}
@@ -1219,7 +1229,7 @@ impl TableProvider for ListingTable {
return
Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}
- let output_ordering = self.try_create_output_ordering()?;
+ let output_ordering =
self.try_create_output_ordering(state.execution_props())?;
match state
.config_options()
.execution
@@ -1359,7 +1369,7 @@ impl TableProvider for ListingTable {
file_extension: self.options().format.get_ext(),
};
- let orderings = self.try_create_output_ordering()?;
+ let orderings =
self.try_create_output_ordering(state.execution_props())?;
// It is sufficient to pass only one of the equivalent orderings:
let order_requirements = orderings.into_iter().next().map(Into::into);
@@ -1587,6 +1597,7 @@ mod tests {
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
+ use datafusion_physical_expr::expressions::binary;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
use rstest::rstest;
@@ -1719,29 +1730,44 @@ mod tests {
use crate::datasource::file_format::parquet::ParquetFormat;
use datafusion_physical_plan::expressions::col as physical_col;
+ use datafusion_physical_plan::expressions::lit as physical_lit;
use std::ops::Add;
// (file_sort_order, expected_result)
let cases = vec![
- (vec![], Ok(Vec::<LexOrdering>::new())),
+ (
+ vec![],
+ Ok::<Vec<LexOrdering>,
DataFusionError>(Vec::<LexOrdering>::new()),
+ ),
// sort expr, but non column
(
- vec![vec![
- col("int_col").add(lit(1)).sort(true, true),
- ]],
- Err("Expected single column reference in sort_order[0][0], got
int_col + Int32(1)"),
+ vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
+ Ok(vec![[PhysicalSortExpr {
+ expr: binary(
+ physical_col("int_col", &schema).unwrap(),
+ Operator::Plus,
+ physical_lit(1),
+ &schema,
+ )
+ .unwrap(),
+ options: SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ }]
+ .into()]),
),
// ok with one column
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![[PhysicalSortExpr {
- expr: physical_col("string_col", &schema).unwrap(),
- options: SortOptions {
- descending: false,
- nulls_first: false,
- },
- }].into(),
- ])
+ expr: physical_col("string_col", &schema).unwrap(),
+ options: SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ }]
+ .into()]),
),
// ok with two columns, different options
(
@@ -1750,14 +1776,18 @@ mod tests {
col("int_col").sort(false, true),
]],
Ok(vec![[
-
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
- .asc()
- .nulls_last(),
-
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
- .desc()
- .nulls_first()
- ].into(),
- ])
+ PhysicalSortExpr::new_default(
+ physical_col("string_col", &schema).unwrap(),
+ )
+ .asc()
+ .nulls_last(),
+ PhysicalSortExpr::new_default(
+ physical_col("int_col", &schema).unwrap(),
+ )
+ .desc()
+ .nulls_first(),
+ ]
+ .into()]),
),
];
@@ -1770,7 +1800,8 @@ mod tests {
let table =
ListingTable::try_new(config.clone()).expect("Creating the
table");
- let ordering_result = table.try_create_output_ordering();
+ let ordering_result =
+ table.try_create_output_ordering(state.execution_props());
match (expected_result, ordering_result) {
(Ok(expected), Ok(result)) => {
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index d1940402ee..79f0082d51 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -18,11 +18,6 @@
//! [`FileScanConfig`] to configure scanning of possibly partitioned
//! file sources.
-use std::{
- any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
- fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
-};
-
use crate::file_groups::FileGroup;
#[allow(unused_imports)]
use crate::schema_adapter::SchemaAdapterFactory;
@@ -54,16 +49,21 @@ use datafusion_physical_expr::{expressions::Column,
utils::reassign_expr_columns
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties,
Partitioning};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
-use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::projection::ProjectionExpr;
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
+ filter_pushdown::FilterPushdownPropagation,
metrics::ExecutionPlanMetricsSet,
projection::{all_alias_free_columns, new_projections_for_columns},
DisplayAs, DisplayFormatType,
};
+use std::{
+ any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
+ fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
+};
+use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_plan::coop::cooperative;
use datafusion_physical_plan::execution_plan::SchedulingType;
use log::{debug, warn};
@@ -1371,30 +1371,11 @@ fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<LexOrdering> {
- let mut all_orderings = vec![];
- for output_ordering in &base_config.output_ordering {
- let mut new_ordering = vec![];
- for PhysicalSortExpr { expr, options } in output_ordering.iter() {
- if let Some(col) = expr.as_any().downcast_ref::<Column>() {
- let name = col.name();
- if let Some((idx, _)) =
projected_schema.column_with_name(name) {
- // Compute the new sort expression (with correct index)
after projection:
- new_ordering.push(PhysicalSortExpr::new(
- Arc::new(Column::new(name, idx)),
- *options,
- ));
- continue;
- }
- }
- // Cannot find expression in the projected_schema, stop iterating
- // since rest of the orderings are violated
- break;
- }
-
- let Some(new_ordering) = LexOrdering::new(new_ordering) else {
- continue;
- };
+ let projected_orderings =
+ project_orderings(&base_config.output_ordering, projected_schema);
+ let mut all_orderings = vec![];
+ for new_ordering in projected_orderings {
// Check if any file groups are not sorted
if base_config.file_groups.iter().any(|group| {
if group.len() <= 1 {
@@ -1467,6 +1448,7 @@ mod tests {
use datafusion_expr::{Operator, SortExpr};
use datafusion_physical_expr::create_physical_sort_expr;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
+ use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
/// Returns the column names on the schema
pub fn columns(schema: &Schema) -> Vec<String> {
diff --git a/datafusion/datasource/src/memory.rs
b/datafusion/datasource/src/memory.rs
index 564033438f..eb55aa9b0b 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -30,10 +30,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::{internal_err, plan_err, project_schema, Result,
ScalarValue};
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::equivalence::{
- OrderingEquivalenceClass, ProjectionMapping,
-};
-use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::memory::MemoryStream;
@@ -433,22 +430,9 @@ impl MemorySourceConfig {
}
// If there is a projection on the source, we also need to project
orderings
- if let Some(projection) = &self.projection {
- let base_schema = self.original_schema();
- let proj_exprs = projection.iter().map(|idx| {
- let name = base_schema.field(*idx).name();
- (Arc::new(Column::new(name, *idx)) as _, name.to_string())
- });
- let projection_mapping =
- ProjectionMapping::try_new(proj_exprs, &base_schema)?;
- let base_eqp = EquivalenceProperties::new_with_orderings(
- Arc::clone(&base_schema),
- sort_information,
- );
- let proj_eqp =
- base_eqp.project(&projection_mapping,
Arc::clone(&self.projected_schema));
- let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
- sort_information = oeq_class.into();
+ if self.projection.is_some() {
+ sort_information =
+ project_orderings(&sort_information, &self.projected_schema);
}
self.sort_information = sort_information;
diff --git a/datafusion/physical-expr/src/equivalence/class.rs
b/datafusion/physical-expr/src/equivalence/class.rs
index 8af6f3be03..66ce77ef41 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -590,6 +590,11 @@ impl EquivalenceGroup {
aug_mapping: &AugmentedMapping,
expr: &Arc<dyn PhysicalExpr>,
) -> Option<Arc<dyn PhysicalExpr>> {
+ // Literals don't need to be projected
+ if expr.as_any().downcast_ref::<Literal>().is_some() {
+ return Some(Arc::clone(expr));
+ }
+
// The given expression is not inside the mapping, so we try to project
// indirectly using equivalence classes.
for (targets, eq_class) in aug_mapping.values() {
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs
b/datafusion/physical-expr/src/equivalence/mod.rs
index ecb73be256..bcc6835e2f 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -30,7 +30,7 @@ mod properties;
pub use class::{AcrossPartitions, ConstExpr, EquivalenceClass,
EquivalenceGroup};
pub use ordering::OrderingEquivalenceClass;
-pub use projection::ProjectionMapping;
+pub use projection::{project_ordering, project_orderings, ProjectionMapping};
pub use properties::{
calculate_union, join_equivalence_properties, EquivalenceProperties,
};
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs
b/datafusion/physical-expr/src/equivalence/projection.rs
index 6fe5605229..a4ed8187cf 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -23,8 +23,9 @@ use crate::PhysicalExpr;
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{internal_err, Result};
+use datafusion_common::{internal_err, plan_err, Result};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use indexmap::IndexMap;
/// Stores target expressions, along with their indices, that associate with a
@@ -156,6 +157,97 @@ impl FromIterator<(Arc<dyn PhysicalExpr>,
ProjectionTargets)> for ProjectionMapp
}
}
+/// Projects a slice of [LexOrdering]s onto the given schema.
+///
+/// This is a convenience wrapper that applies [project_ordering] to each
+/// input ordering and collects the successful projections:
+/// - For each input ordering, the result of [project_ordering] is appended to
+/// the output if it is `Some(...)`.
+/// - Order is preserved and no deduplication is attempted.
+/// - If none of the input orderings can be projected, an empty `Vec` is
+/// returned.
+///
+/// See [project_ordering] for the semantics of projecting a single
+/// [LexOrdering].
+pub fn project_orderings(
+ orderings: &[LexOrdering],
+ schema: &SchemaRef,
+) -> Vec<LexOrdering> {
+ let mut projected_orderings = vec![];
+
+ for ordering in orderings {
+ projected_orderings.extend(project_ordering(ordering, schema));
+ }
+
+ projected_orderings
+}
+
+/// Projects a single [LexOrdering] onto the given schema.
+///
+/// This function attempts to rewrite every [PhysicalSortExpr] in the provided
+/// [LexOrdering] so that any [Column] expressions point at the correct field
+/// indices in `schema`.
+///
+/// Key details:
+/// - Columns are matched by name, not by index. The index of each matched
+/// column is looked up with
[Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new
+/// [Column] with the correct [index](Column::index) is substituted.
+/// - If an expression references a column name that does not exist in
+/// `schema`, projection of the current ordering stops and only the already
+/// rewritten prefix is kept. This models the fact that a lexicographical
+/// ordering remains valid for any leading prefix whose expressions are
+/// present in the projected schema.
+/// - If no expressions can be projected (i.e. the first one is missing), the
+/// function returns `None`.
+///
+/// Return value:
+/// - `Some(LexOrdering)` if at least one sort expression could be projected.
+/// The returned ordering may be a strict prefix of the input ordering.
+/// - `None` if no part of the ordering can be projected onto `schema`.
+///
+/// Example
+///
+/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the
projected
+/// schema only contains b and not a. The result will be `Some([col("a@0")])`.
In other
+/// words, the column reference is reindexed to match the projected schema.
+/// If neither a nor b is present, the result will be None.
+pub fn project_ordering(
+ ordering: &LexOrdering,
+ schema: &SchemaRef,
+) -> Option<LexOrdering> {
+ let mut projected_exprs = vec![];
+ for PhysicalSortExpr { expr, options } in ordering.iter() {
+ let transformed = Arc::clone(expr).transform_up(|expr| {
+ let Some(col) = expr.as_any().downcast_ref::<Column>() else {
+ return Ok(Transformed::no(expr));
+ };
+
+ let name = col.name();
+ if let Some((idx, _)) = schema.column_with_name(name) {
+ // Compute the new column expression (with correct index)
after projection:
+ Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
+ } else {
+ // Cannot find expression in the projected_schema,
+ // signal this using an Err result
+ plan_err!("")
+ }
+ });
+
+ match transformed {
+ Ok(transformed) => {
+ projected_exprs.push(PhysicalSortExpr::new(transformed.data,
*options));
+ }
+ Err(_) => {
+ // Err result indicates an expression could not be found in the
+ // projected_schema, stop iterating since rest of the
orderings are violated
+ break;
+ }
+ }
+ }
+
+ LexOrdering::new(projected_exprs)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index 50a8e109ae..468591d34d 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,9 +56,9 @@ pub use equivalence::{
};
pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{
- add_offset_to_expr, add_offset_to_physical_sort_exprs, create_ordering,
- create_physical_sort_expr, create_physical_sort_exprs,
physical_exprs_bag_equal,
- physical_exprs_contains, physical_exprs_equal,
+ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering,
+ create_ordering, create_physical_sort_expr, create_physical_sort_exprs,
+ physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
};
pub use datafusion_physical_expr_common::physical_expr::{PhysicalExpr,
PhysicalExprRef};
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index 3f063d7a03..2cc484ec6a 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -21,7 +21,7 @@ use crate::expressions::{self, Column};
use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use arrow::compute::SortOptions;
-use arrow::datatypes::Schema;
+use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result};
use datafusion_common::{DFSchema, HashMap};
@@ -29,7 +29,6 @@ use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, SortExpr};
use itertools::izip;
-
// Exports:
pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -163,6 +162,32 @@ pub fn create_ordering(
Ok(all_sort_orders)
}
+/// Creates a vector of [LexOrdering] from a vector of logical expression
+pub fn create_lex_ordering(
+ schema: &SchemaRef,
+ sort_order: &[Vec<SortExpr>],
+ execution_props: &ExecutionProps,
+) -> Result<Vec<LexOrdering>> {
+ // Try the fast path that only supports column references first
+ // This avoids creating a DFSchema
+ if let Ok(ordering) = create_ordering(schema, sort_order) {
+ return Ok(ordering);
+ }
+
+ let df_schema = DFSchema::try_from(Arc::clone(schema))?;
+
+ let mut all_sort_orders = vec![];
+
+ for exprs in sort_order.iter() {
+ all_sort_orders.extend(LexOrdering::new(create_physical_sort_exprs(
+ exprs,
+ &df_schema,
+ execution_props,
+ )?));
+ }
+ Ok(all_sort_orders)
+}
+
/// Create a physical sort expression from a logical expression
pub fn create_physical_sort_expr(
e: &SortExpr,
diff --git a/datafusion/sqllogictest/data/composite_order.csv
b/datafusion/sqllogictest/data/composite_order.csv
new file mode 100644
index 0000000000..b2c5e881bd
--- /dev/null
+++ b/datafusion/sqllogictest/data/composite_order.csv
@@ -0,0 +1,8 @@
+a,b
+1,0
+0,2
+1,2
+0,4
+5,0
+3,3
+4,3
diff --git a/datafusion/sqllogictest/test_files/order.slt
b/datafusion/sqllogictest/test_files/order.slt
index 1050b59613..04a7615c76 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -1517,3 +1517,40 @@ SELECT address, zip FROM addresses ORDER BY ALL;
111 Duck Duck Goose Ln 11111
111 Duck Duck Goose Ln 11111-0001
123 Quack Blvd 11111
+
+# Create a table with an order clause that's not a simple column reference
+statement ok
+CREATE EXTERNAL TABLE ordered (
+ a BIGINT NOT NULL,
+ b BIGINT NOT NULL
+)
+STORED AS CSV
+LOCATION 'data/composite_order.csv'
+OPTIONS ('format.has_header' 'true')
+WITH ORDER (a + b);
+
+# Simple query should be just a table scan
+query TT
+EXPLAIN SELECT * from ordered;
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]},
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv,
has_header=true
+
+# Query ordered by the declared order should be just a table scan
+query TT
+EXPLAIN SELECT * from ordered ORDER BY (a + b);
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]},
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv,
has_header=true
+
+# Order equivalence handling should make this query a simple table scan
+query TT
+EXPLAIN SELECT * from ordered ORDER BY -(a + b) desc nulls last;
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]},
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv,
has_header=true
+
+# Ordering by another column requires a sort
+query TT
+EXPLAIN SELECT * from ordered ORDER BY a;
+----
+physical_plan
+01)SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/data/composite_order.csv]]},
projection=[a, b], output_ordering=[a@0 + b@1 ASC NULLS LAST], file_type=csv,
has_header=true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]