This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e353eb08fb Project sort expressions in StreamingTable (#19719)
e353eb08fb is described below
commit e353eb08fb57570ce87ff29a00743ffcdf40e1a3
Author: Tim Saucer <[email protected]>
AuthorDate: Fri Jan 16 11:01:42 2026 -0500
Project sort expressions in StreamingTable (#19719)
## Which issue does this PR close?
- Closes #19717
## Rationale for this change
If we have a `StreamingTable` that has both physical sort expressions
and projection, we will get errors when trying to execute the plan if
the sort expressions are not included in the projection.
## What changes are included in this PR?
When we have both projection and physical sort expressions, project the
schema and the sort expressions.
## Are these changes tested?
Unit test added.
## Are there any user-facing changes?
No
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/catalog/src/streaming.rs | 26 ++++++++---
.../tests/physical_optimizer/enforce_sorting.rs | 50 ++++++++++++++++++++--
2 files changed, 67 insertions(+), 9 deletions(-)
diff --git a/datafusion/catalog/src/streaming.rs
b/datafusion/catalog/src/streaming.rs
index 31669171b2..db9596b420 100644
--- a/datafusion/catalog/src/streaming.rs
+++ b/datafusion/catalog/src/streaming.rs
@@ -20,19 +20,18 @@
use std::any::Any;
use std::sync::Arc;
-use crate::Session;
-use crate::TableProvider;
-
use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
use datafusion_common::{DFSchema, Result, plan_err};
use datafusion_expr::{Expr, SortExpr, TableType};
+use datafusion_physical_expr::equivalence::project_ordering;
use datafusion_physical_expr::{LexOrdering, create_physical_sort_exprs};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
-
-use async_trait::async_trait;
use log::debug;
+use crate::{Session, TableProvider};
+
/// A [`TableProvider`] that streams a set of [`PartitionStream`]
#[derive(Debug)]
pub struct StreamingTable {
@@ -105,7 +104,22 @@ impl TableProvider for StreamingTable {
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
let eqp = state.execution_props();
- create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?
+ let original_sort_exprs =
+ create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?;
+
+ if let Some(p) = projection {
+ // When performing a projection, the output columns will not
match
+ // the original physical sort expression indices. Also the
sort columns
+ // may not be in the output projection. To correct for these
issues
+ // we need to project the ordering based on the output schema.
+ let schema = Arc::new(self.schema.project(p)?);
+ LexOrdering::new(original_sort_exprs)
+ .and_then(|lex_ordering| project_ordering(&lex_ordering,
&schema))
+ .map(|lex_ordering| lex_ordering.to_vec())
+ .unwrap_or_default()
+ } else {
+ original_sort_exprs
+ }
} else {
vec![]
};
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 4b74aebdf5..6349ff1cd1 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -29,11 +29,11 @@ use crate::physical_optimizer::test_utils::{
spr_repartition_exec, stream_exec_ordered, union_exec,
};
-use arrow::compute::SortOptions;
+use arrow::compute::{SortOptions};
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::config::{ConfigOptions, CsvOptions};
use datafusion_common::tree_node::{TreeNode, TransformedResult};
-use datafusion_common::{Result, TableReference};
+use datafusion_common::{create_array, Result, TableReference};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_expr_common::operator::Operator;
@@ -58,7 +58,7 @@ use
datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion::prelude::*;
-use arrow::array::{Int32Array, RecordBatch};
+use arrow::array::{record_batch, ArrayRef, Int32Array, RecordBatch};
use arrow::datatypes::{Field};
use arrow_schema::Schema;
use datafusion_execution::TaskContext;
@@ -2805,3 +2805,47 @@ async fn test_partial_sort_with_homogeneous_batches() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_sort_with_streaming_table() -> Result<()> {
+ let batch = record_batch!(("a", Int32, [1, 2, 3]), ("b", Int32, [1, 2,
3]))?;
+
+ let ctx = SessionContext::new();
+
+ let sort_order = vec![
+ SortExpr::new(
+ Expr::Column(datafusion_common::Column::new(
+ Option::<TableReference>::None,
+ "a",
+ )),
+ true,
+ false,
+ ),
+ SortExpr::new(
+ Expr::Column(datafusion_common::Column::new(
+ Option::<TableReference>::None,
+ "b",
+ )),
+ true,
+ false,
+ ),
+ ];
+ let schema = batch.schema();
+ let batches = Arc::new(DummyStreamPartition {
+ schema: schema.clone(),
+ batches: vec![batch],
+ }) as _;
+ let provider = StreamingTable::try_new(schema.clone(), vec![batches])?
+ .with_sort_order(sort_order);
+ ctx.register_table("test_table", Arc::new(provider))?;
+
+ let sql = "SELECT a FROM test_table GROUP BY a ORDER BY a";
+ let results = ctx.sql(sql).await?.collect().await?;
+
+ assert_eq!(results.len(), 1);
+ assert_eq!(results[0].num_columns(), 1);
+ let expected = create_array!(Int32, vec![1, 2, 3]) as ArrayRef;
+ assert_eq!(results[0].column(0), &expected);
+
+ Ok(())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]