This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e353eb08fb Project sort expressions in StreamingTable (#19719)
e353eb08fb is described below

commit e353eb08fb57570ce87ff29a00743ffcdf40e1a3
Author: Tim Saucer <[email protected]>
AuthorDate: Fri Jan 16 11:01:42 2026 -0500

    Project sort expressions in StreamingTable (#19719)
    
    ## Which issue does this PR close?
    
    - Closes #19717
    
    ## Rationale for this change
    
    If we have a `StreamingTable` that has both physical sort expressions
    and projection, we will get errors when trying to execute the plan if
    the sort expressions are not included in the projection.
    
    ## What changes are included in this PR?
    
    When we have both projection and physical sort expressions, project the
    schema and the sort expressions.
    
    ## Are these changes tested?
    
    Unit test added.
    
    ## Are there any user-facing changes?
    
    No
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/catalog/src/streaming.rs                | 26 ++++++++---
 .../tests/physical_optimizer/enforce_sorting.rs    | 50 ++++++++++++++++++++--
 2 files changed, 67 insertions(+), 9 deletions(-)

diff --git a/datafusion/catalog/src/streaming.rs 
b/datafusion/catalog/src/streaming.rs
index 31669171b2..db9596b420 100644
--- a/datafusion/catalog/src/streaming.rs
+++ b/datafusion/catalog/src/streaming.rs
@@ -20,19 +20,18 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use crate::Session;
-use crate::TableProvider;
-
 use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
 use datafusion_common::{DFSchema, Result, plan_err};
 use datafusion_expr::{Expr, SortExpr, TableType};
+use datafusion_physical_expr::equivalence::project_ordering;
 use datafusion_physical_expr::{LexOrdering, create_physical_sort_exprs};
 use datafusion_physical_plan::ExecutionPlan;
 use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
-
-use async_trait::async_trait;
 use log::debug;
 
+use crate::{Session, TableProvider};
+
 /// A [`TableProvider`] that streams a set of [`PartitionStream`]
 #[derive(Debug)]
 pub struct StreamingTable {
@@ -105,7 +104,22 @@ impl TableProvider for StreamingTable {
             let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
             let eqp = state.execution_props();
 
-            create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?
+            let original_sort_exprs =
+                create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?;
+
+            if let Some(p) = projection {
+                // When performing a projection, the output columns will not 
match
+                // the original physical sort expression indices. Also the 
sort columns
+                // may not be in the output projection. To correct for these 
issues
+                // we need to project the ordering based on the output schema.
+                let schema = Arc::new(self.schema.project(p)?);
+                LexOrdering::new(original_sort_exprs)
+                    .and_then(|lex_ordering| project_ordering(&lex_ordering, 
&schema))
+                    .map(|lex_ordering| lex_ordering.to_vec())
+                    .unwrap_or_default()
+            } else {
+                original_sort_exprs
+            }
         } else {
             vec![]
         };
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 4b74aebdf5..6349ff1cd1 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -29,11 +29,11 @@ use crate::physical_optimizer::test_utils::{
     spr_repartition_exec, stream_exec_ordered, union_exec,
 };
 
-use arrow::compute::SortOptions;
+use arrow::compute::{SortOptions};
 use arrow::datatypes::{DataType, SchemaRef};
 use datafusion_common::config::{ConfigOptions, CsvOptions};
 use datafusion_common::tree_node::{TreeNode, TransformedResult};
-use datafusion_common::{Result,  TableReference};
+use datafusion_common::{create_array, Result, TableReference};
 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
 use datafusion_datasource::source::DataSourceExec;
 use datafusion_expr_common::operator::Operator;
@@ -58,7 +58,7 @@ use 
datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
 use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion::prelude::*;
-use arrow::array::{Int32Array, RecordBatch};
+use arrow::array::{record_batch, ArrayRef, Int32Array, RecordBatch};
 use arrow::datatypes::{Field};
 use arrow_schema::Schema;
 use datafusion_execution::TaskContext;
@@ -2805,3 +2805,47 @@ async fn test_partial_sort_with_homogeneous_batches() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_sort_with_streaming_table() -> Result<()> {
+    let batch = record_batch!(("a", Int32, [1, 2, 3]), ("b", Int32, [1, 2, 
3]))?;
+
+    let ctx = SessionContext::new();
+
+    let sort_order = vec![
+        SortExpr::new(
+            Expr::Column(datafusion_common::Column::new(
+                Option::<TableReference>::None,
+                "a",
+            )),
+            true,
+            false,
+        ),
+        SortExpr::new(
+            Expr::Column(datafusion_common::Column::new(
+                Option::<TableReference>::None,
+                "b",
+            )),
+            true,
+            false,
+        ),
+    ];
+    let schema = batch.schema();
+    let batches = Arc::new(DummyStreamPartition {
+        schema: schema.clone(),
+        batches: vec![batch],
+    }) as _;
+    let provider = StreamingTable::try_new(schema.clone(), vec![batches])?
+        .with_sort_order(sort_order);
+    ctx.register_table("test_table", Arc::new(provider))?;
+
+    let sql = "SELECT a FROM test_table GROUP BY a ORDER BY a";
+    let results = ctx.sql(sql).await?.collect().await?;
+
+    assert_eq!(results.len(), 1);
+    assert_eq!(results[0].num_columns(), 1);
+    let expected = create_array!(Int32, vec![1, 2, 3]) as ArrayRef;
+    assert_eq!(results[0].column(0), &expected);
+
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to