This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3dd3d7359f refactor: Move view and stream from `datasource` to
`catalog`, deprecate `View::try_new` (#15260)
3dd3d7359f is described below
commit 3dd3d7359f25e80412b6178fb8c3586fd83505f1
Author: logan-keede <[email protected]>
AuthorDate: Wed Mar 19 16:18:19 2025 +0530
refactor: Move view and stream from `datasource` to `catalog`, deprecate
`View::try_new` (#15260)
* First Iteration
* minor change
* unnecessary addition removal
* sorting weird imports
* fix: doc
* fix:doctest
* apply suggestions
* fix:doctest
* Deprecate View::try_new w/ hint
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
Cargo.lock | 2 +
datafusion/catalog/Cargo.toml | 4 +-
datafusion/catalog/src/lib.rs | 2 +
.../{core/src/datasource => catalog/src}/stream.rs | 8 +-
datafusion/catalog/src/view.rs | 155 +++++++++++++++++++++
datafusion/core/src/datasource/mod.rs | 93 +------------
.../core/src/datasource/{view.rs => view_test.rs} | 147 +------------------
datafusion/core/src/execution/context/mod.rs | 20 ++-
datafusion/physical-expr/src/lib.rs | 4 +-
datafusion/physical-expr/src/physical_expr.rs | 88 ++++++++++++
datafusion/proto/src/logical_plan/mod.rs | 2 +-
11 files changed, 278 insertions(+), 247 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 79910869f0..f2c94e9bfa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1892,8 +1892,10 @@ dependencies = [
"async-trait",
"dashmap",
"datafusion-common",
+ "datafusion-common-runtime",
"datafusion-execution",
"datafusion-expr",
+ "datafusion-physical-expr",
"datafusion-physical-plan",
"datafusion-sql",
"futures",
diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml
index 4e1555b782..113d688253 100644
--- a/datafusion/catalog/Cargo.toml
+++ b/datafusion/catalog/Cargo.toml
@@ -35,8 +35,10 @@ arrow = { workspace = true }
async-trait = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true }
+datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
+datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
futures = { workspace = true }
@@ -44,8 +46,6 @@ itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
-
-[dev-dependencies]
tokio = { workspace = true }
[lints]
diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs
index e4ccba8651..7ba97fbc9f 100644
--- a/datafusion/catalog/src/lib.rs
+++ b/datafusion/catalog/src/lib.rs
@@ -49,4 +49,6 @@ pub use r#async::*;
pub use schema::*;
pub use session::*;
pub use table::*;
+pub mod stream;
pub mod streaming;
+pub mod view;
diff --git a/datafusion/core/src/datasource/stream.rs
b/datafusion/catalog/src/stream.rs
similarity index 98%
rename from datafusion/core/src/datasource/stream.rs
rename to datafusion/catalog/src/stream.rs
index ffb4860544..3fb6724907 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/catalog/src/stream.rs
@@ -25,9 +25,7 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
-use crate::catalog::{TableProvider, TableProviderFactory};
-use crate::datasource::create_ordering;
-
+use crate::{Session, TableProvider, TableProviderFactory};
use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
use arrow::datatypes::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError,
Result};
@@ -35,13 +33,13 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
+use datafusion_physical_expr::create_ordering;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use async_trait::async_trait;
-use datafusion_catalog::Session;
use futures::StreamExt;
/// A [`TableProviderFactory`] for [`StreamTable`]
@@ -292,7 +290,7 @@ impl StreamConfig {
/// data stored in object storage, should instead consider [`ListingTable`].
///
/// [Hadoop]: https://hadoop.apache.org/
-/// [`ListingTable`]: crate::datasource::listing::ListingTable
+/// [`ListingTable`]:
https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
#[derive(Debug)]
pub struct StreamTable(Arc<StreamConfig>);
diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs
new file mode 100644
index 0000000000..8dfb79718c
--- /dev/null
+++ b/datafusion/catalog/src/view.rs
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! View data source which uses a LogicalPlan as it's input.
+
+use std::{any::Any, borrow::Cow, sync::Arc};
+
+use crate::Session;
+use crate::TableProvider;
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::error::Result;
+use datafusion_common::Column;
+use datafusion_expr::TableType;
+use datafusion_expr::{Expr, LogicalPlan};
+use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
+use datafusion_physical_plan::ExecutionPlan;
+
+/// An implementation of `TableProvider` that uses another logical plan.
+#[derive(Debug)]
+pub struct ViewTable {
+ /// LogicalPlan of the view
+ logical_plan: LogicalPlan,
+ /// File fields + partition columns
+ table_schema: SchemaRef,
+ /// SQL used to create the view, if available
+ definition: Option<String>,
+}
+
+impl ViewTable {
+ /// Create new view that is executed at query runtime.
+ ///
+ /// Takes a `LogicalPlan` and optionally the SQL text of the `CREATE`
+ /// statement.
+ ///
+ /// Notes: the `LogicalPlan` is not validated or type coerced. If this is
+ /// needed it should be done after calling this function.
+ pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
+ let table_schema = logical_plan.schema().as_ref().to_owned().into();
+ Self {
+ logical_plan,
+ table_schema,
+ definition,
+ }
+ }
+
+ #[deprecated(
+ since = "47.0.0",
+ note = "Use `ViewTable::new` instead and apply TypeCoercion to the
logical plan if needed"
+ )]
+ pub fn try_new(
+ logical_plan: LogicalPlan,
+ definition: Option<String>,
+ ) -> Result<Self> {
+ Ok(Self::new(logical_plan, definition))
+ }
+
+ /// Get definition ref
+ pub fn definition(&self) -> Option<&String> {
+ self.definition.as_ref()
+ }
+
+ /// Get logical_plan ref
+ pub fn logical_plan(&self) -> &LogicalPlan {
+ &self.logical_plan
+ }
+}
+
+#[async_trait]
+impl TableProvider for ViewTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
+ Some(Cow::Borrowed(&self.logical_plan))
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.table_schema)
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ fn get_table_definition(&self) -> Option<&str> {
+ self.definition.as_deref()
+ }
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> Result<Vec<TableProviderFilterPushDown>> {
+ // A filter is added on the View when given
+ Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
+ }
+
+ async fn scan(
+ &self,
+ state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ filters: &[Expr],
+ limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
+ let plan = self.logical_plan().clone();
+ let mut plan = LogicalPlanBuilder::from(plan);
+
+ if let Some(filter) = filter {
+ plan = plan.filter(filter)?;
+ }
+
+ let mut plan = if let Some(projection) = projection {
+ // avoiding adding a redundant projection (e.g. SELECT * FROM view)
+ let current_projection =
+ (0..plan.schema().fields().len()).collect::<Vec<usize>>();
+ if projection == ¤t_projection {
+ plan
+ } else {
+ let fields: Vec<Expr> = projection
+ .iter()
+ .map(|i| {
+ Expr::Column(Column::from(
+ self.logical_plan.schema().qualified_field(*i),
+ ))
+ })
+ .collect();
+ plan.project(fields)?
+ }
+ } else {
+ plan
+ };
+
+ if let Some(limit) = limit {
+ plan = plan.limit(0, Some(limit))?;
+ }
+
+ state.create_physical_plan(&plan.build()?).await
+ }
+}
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index 18a1318dd4..80783b4892 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -30,9 +30,10 @@ pub mod memory;
pub mod physical_plan;
pub mod provider;
mod statistics;
-pub mod stream;
-pub mod view;
+mod view_test;
+pub use datafusion_catalog::stream;
+pub use datafusion_catalog::view;
pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::source;
@@ -45,95 +46,9 @@ pub use self::view::ViewTable;
pub use crate::catalog::TableProvider;
pub use crate::logical_expr::TableType;
pub use datafusion_execution::object_store;
+pub use datafusion_physical_expr::create_ordering;
pub use statistics::get_statistics_with_limit;
-use arrow::compute::SortOptions;
-use arrow::datatypes::Schema;
-use datafusion_common::{plan_err, Result};
-use datafusion_expr::{Expr, SortExpr};
-use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};
-
-/// Converts logical sort expressions to physical sort expressions
-///
-/// This function transforms a collection of logical sort expressions into
their physical
-/// representation that can be used during query execution.
-///
-/// # Arguments
-///
-/// * `schema` - The schema containing column definitions
-/// * `sort_order` - A collection of logical sort expressions grouped into
lexicographic orderings
-///
-/// # Returns
-///
-/// A vector of lexicographic orderings for physical execution, or an error if
the transformation fails
-///
-/// # Examples
-///
-/// ```
-/// // Create orderings from columns "id" and "name"
-/// # use arrow::datatypes::{Schema, Field, DataType};
-/// # use datafusion::datasource::create_ordering;
-/// # use datafusion_common::Column;
-/// # use datafusion_expr::{Expr, SortExpr};
-/// #
-/// // Create a schema with two fields
-/// let schema = Schema::new(vec![
-/// Field::new("id", DataType::Int32, false),
-/// Field::new("name", DataType::Utf8, false),
-/// ]);
-///
-/// let sort_exprs = vec![
-/// vec![
-/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc:
true, nulls_first: false }
-/// ],
-/// vec![
-/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")),
asc: false, nulls_first: true }
-/// ]
-/// ];
-/// let result = create_ordering(&schema, &sort_exprs).unwrap();
-/// ```
-pub fn create_ordering(
- schema: &Schema,
- sort_order: &[Vec<SortExpr>],
-) -> Result<Vec<LexOrdering>> {
- let mut all_sort_orders = vec![];
-
- for (group_idx, exprs) in sort_order.iter().enumerate() {
- // Construct PhysicalSortExpr objects from Expr objects:
- let mut sort_exprs = LexOrdering::default();
- for (expr_idx, sort) in exprs.iter().enumerate() {
- match &sort.expr {
- Expr::Column(col) => match expressions::col(&col.name, schema)
{
- Ok(expr) => {
- sort_exprs.push(PhysicalSortExpr {
- expr,
- options: SortOptions {
- descending: !sort.asc,
- nulls_first: sort.nulls_first,
- },
- });
- }
- // Cannot find expression in the projected_schema, stop
iterating
- // since rest of the orderings are violated
- Err(_) => break,
- },
- expr => {
- return plan_err!(
- "Expected single column reference in
sort_order[{}][{}], got {}",
- group_idx,
- expr_idx,
- expr
- );
- }
- }
- }
- if !sort_exprs.is_empty() {
- all_sort_orders.push(sort_exprs);
- }
- }
- Ok(all_sort_orders)
-}
-
#[cfg(all(test, feature = "parquet"))]
mod tests {
diff --git a/datafusion/core/src/datasource/view.rs
b/datafusion/core/src/datasource/view_test.rs
similarity index 78%
rename from datafusion/core/src/datasource/view.rs
rename to datafusion/core/src/datasource/view_test.rs
index e4f57b0d97..c3dd5a2dd9 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view_test.rs
@@ -17,157 +17,14 @@
//! View data source which uses a LogicalPlan as it's input.
-use std::{any::Any, borrow::Cow, sync::Arc};
-
-use crate::{
- error::Result,
- logical_expr::{Expr, LogicalPlan},
- physical_plan::ExecutionPlan,
-};
-use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
-use datafusion_catalog::Session;
-use datafusion_common::config::ConfigOptions;
-use datafusion_common::Column;
-use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
-use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
-use datafusion_optimizer::Analyzer;
-
-use crate::datasource::{TableProvider, TableType};
-
-/// An implementation of `TableProvider` that uses another logical plan.
-#[derive(Debug)]
-pub struct ViewTable {
- /// LogicalPlan of the view
- logical_plan: LogicalPlan,
- /// File fields + partition columns
- table_schema: SchemaRef,
- /// SQL used to create the view, if available
- definition: Option<String>,
-}
-
-impl ViewTable {
- /// Create new view that is executed at query runtime.
- /// Takes a `LogicalPlan` and an optional create statement as input.
- pub fn try_new(
- logical_plan: LogicalPlan,
- definition: Option<String>,
- ) -> Result<Self> {
- let logical_plan = Self::apply_required_rule(logical_plan)?;
- let table_schema = logical_plan.schema().as_ref().to_owned().into();
-
- let view = Self {
- logical_plan,
- table_schema,
- definition,
- };
-
- Ok(view)
- }
-
- fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
- let options = ConfigOptions::default();
-
Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]).execute_and_check(
- logical_plan,
- &options,
- |_, _| {},
- )
- }
-
- /// Get definition ref
- pub fn definition(&self) -> Option<&String> {
- self.definition.as_ref()
- }
-
- /// Get logical_plan ref
- pub fn logical_plan(&self) -> &LogicalPlan {
- &self.logical_plan
- }
-}
-
-#[async_trait]
-impl TableProvider for ViewTable {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
- Some(Cow::Borrowed(&self.logical_plan))
- }
-
- fn schema(&self) -> SchemaRef {
- Arc::clone(&self.table_schema)
- }
-
- fn table_type(&self) -> TableType {
- TableType::View
- }
-
- fn get_table_definition(&self) -> Option<&str> {
- self.definition.as_deref()
- }
- fn supports_filters_pushdown(
- &self,
- filters: &[&Expr],
- ) -> Result<Vec<TableProviderFilterPushDown>> {
- // A filter is added on the View when given
- Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
- }
-
- async fn scan(
- &self,
- state: &dyn Session,
- projection: Option<&Vec<usize>>,
- filters: &[Expr],
- limit: Option<usize>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
- let plan = self.logical_plan().clone();
- let mut plan = LogicalPlanBuilder::from(plan);
-
- if let Some(filter) = filter {
- plan = plan.filter(filter)?;
- }
-
- let mut plan = if let Some(projection) = projection {
- // avoiding adding a redundant projection (e.g. SELECT * FROM view)
- let current_projection =
- (0..plan.schema().fields().len()).collect::<Vec<usize>>();
- if projection == ¤t_projection {
- plan
- } else {
- let fields: Vec<Expr> = projection
- .iter()
- .map(|i| {
- Expr::Column(Column::from(
- self.logical_plan.schema().qualified_field(*i),
- ))
- })
- .collect();
- plan.project(fields)?
- }
- } else {
- plan
- };
-
- if let Some(limit) = limit {
- plan = plan.limit(0, Some(limit))?;
- }
-
- state.create_physical_plan(&plan.build()?).await
- }
-}
-
#[cfg(test)]
mod tests {
- use datafusion_expr::{col, lit};
-
+ use crate::error::Result;
use crate::execution::options::ParquetReadOptions;
use crate::prelude::SessionContext;
use crate::test_util::parquet_test_data;
use crate::{assert_batches_eq, execution::context::SessionConfig};
-
- use super::*;
+ use datafusion_expr::{col, lit};
#[tokio::test]
async fn issue_3242() -> Result<()> {
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index beefca6d57..faf689179e 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -75,9 +75,12 @@ use chrono::{DateTime, Utc};
use datafusion_catalog::{
DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl,
UrlTableFactory,
};
+use datafusion_common::config::ConfigOptions;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
+use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
+use datafusion_optimizer::Analyzer;
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
use object_store::ObjectStore;
use parking_lot::RwLock;
@@ -856,6 +859,16 @@ impl SessionContext {
}
}
+ /// Applies the `TypeCoercion` rewriter to the logical plan.
+ fn apply_type_coercion(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
+ let options = ConfigOptions::default();
+
Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]).execute_and_check(
+ logical_plan,
+ &options,
+ |_, _| {},
+ )
+ }
+
async fn create_view(&self, cmd: CreateView) -> Result<DataFrame> {
let CreateView {
name,
@@ -874,13 +887,14 @@ impl SessionContext {
match (or_replace, view) {
(true, Ok(_)) => {
self.deregister_table(name.clone())?;
- let table = Arc::new(ViewTable::try_new((*input).clone(),
definition)?);
-
+ let input = Self::apply_type_coercion(input.as_ref().clone())?;
+ let table = Arc::new(ViewTable::new(input, definition));
self.register_table(name, table)?;
self.return_empty_dataframe()
}
(_, Err(_)) => {
- let table = Arc::new(ViewTable::try_new((*input).clone(),
definition)?);
+ let input = Self::apply_type_coercion(input.as_ref().clone())?;
+ let table = Arc::new(ViewTable::new(input, definition));
self.register_table(name, table)?;
self.return_empty_dataframe()
}
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index e52c686cd9..9abaeae440 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -54,8 +54,8 @@ pub use equivalence::{
};
pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{
- physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
- PhysicalExprRef,
+ create_ordering, physical_exprs_bag_equal, physical_exprs_contains,
+ physical_exprs_equal, PhysicalExprRef,
};
pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index a4184845a0..2221bc980f 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -58,6 +58,94 @@ pub fn physical_exprs_bag_equal(
multi_set_lhs == multi_set_rhs
}
+use crate::{expressions, LexOrdering, PhysicalSortExpr};
+use arrow::compute::SortOptions;
+use arrow::datatypes::Schema;
+use datafusion_common::plan_err;
+use datafusion_common::Result;
+use datafusion_expr::{Expr, SortExpr};
+
+/// Converts logical sort expressions to physical sort expressions
+///
+/// This function transforms a collection of logical sort expressions into
their physical
+/// representation that can be used during query execution.
+///
+/// # Arguments
+///
+/// * `schema` - The schema containing column definitions
+/// * `sort_order` - A collection of logical sort expressions grouped into
lexicographic orderings
+///
+/// # Returns
+///
+/// A vector of lexicographic orderings for physical execution, or an error if
the transformation fails
+///
+/// # Examples
+///
+/// ```
+/// // Create orderings from columns "id" and "name"
+/// # use arrow::datatypes::{Schema, Field, DataType};
+/// # use datafusion_physical_expr::create_ordering;
+/// # use datafusion_common::Column;
+/// # use datafusion_expr::{Expr, SortExpr};
+/// #
+/// // Create a schema with two fields
+/// let schema = Schema::new(vec![
+/// Field::new("id", DataType::Int32, false),
+/// Field::new("name", DataType::Utf8, false),
+/// ]);
+///
+/// let sort_exprs = vec![
+/// vec![
+/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc:
true, nulls_first: false }
+/// ],
+/// vec![
+/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")),
asc: false, nulls_first: true }
+/// ]
+/// ];
+/// let result = create_ordering(&schema, &sort_exprs).unwrap();
+/// ```
+pub fn create_ordering(
+ schema: &Schema,
+ sort_order: &[Vec<SortExpr>],
+) -> Result<Vec<LexOrdering>> {
+ let mut all_sort_orders = vec![];
+
+ for (group_idx, exprs) in sort_order.iter().enumerate() {
+ // Construct PhysicalSortExpr objects from Expr objects:
+ let mut sort_exprs = LexOrdering::default();
+ for (expr_idx, sort) in exprs.iter().enumerate() {
+ match &sort.expr {
+ Expr::Column(col) => match expressions::col(&col.name, schema)
{
+ Ok(expr) => {
+ sort_exprs.push(PhysicalSortExpr {
+ expr,
+ options: SortOptions {
+ descending: !sort.asc,
+ nulls_first: sort.nulls_first,
+ },
+ });
+ }
+ // Cannot find expression in the projected_schema, stop
iterating
+ // since rest of the orderings are violated
+ Err(_) => break,
+ },
+ expr => {
+ return plan_err!(
+ "Expected single column reference in
sort_order[{}][{}], got {}",
+ group_idx,
+ expr_idx,
+ expr
+ );
+ }
+ }
+ }
+ if !sort_exprs.is_empty() {
+ all_sort_orders.push(sort_exprs);
+ }
+ }
+ Ok(all_sort_orders)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 148856cd10..c65569ef1c 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -867,7 +867,7 @@ impl AsLogicalPlan for LogicalPlanNode {
None
};
- let provider = ViewTable::try_new(input, definition)?;
+ let provider = ViewTable::new(input, definition);
let table_name =
from_table_reference(scan.table_name.as_ref(),
"ViewScan")?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]