This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dd3d7359f refactor: Move view and stream from `datasource` to 
`catalog`, deprecate `View::try_new` (#15260)
3dd3d7359f is described below

commit 3dd3d7359f25e80412b6178fb8c3586fd83505f1
Author: logan-keede <[email protected]>
AuthorDate: Wed Mar 19 16:18:19 2025 +0530

    refactor: Move view and stream from `datasource` to `catalog`, deprecate 
`View::try_new` (#15260)
    
    * First Iteration
    
    * minor change
    
    * unnecessary addition removal
    
    * sorting weird imports
    
    * fix: doc
    
    * fix:doctest
    
    * apply suggestions
    
    * fix:doctest
    
    * Deprecate View::try_new w/ hint
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 Cargo.lock                                         |   2 +
 datafusion/catalog/Cargo.toml                      |   4 +-
 datafusion/catalog/src/lib.rs                      |   2 +
 .../{core/src/datasource => catalog/src}/stream.rs |   8 +-
 datafusion/catalog/src/view.rs                     | 155 +++++++++++++++++++++
 datafusion/core/src/datasource/mod.rs              |  93 +------------
 .../core/src/datasource/{view.rs => view_test.rs}  | 147 +------------------
 datafusion/core/src/execution/context/mod.rs       |  20 ++-
 datafusion/physical-expr/src/lib.rs                |   4 +-
 datafusion/physical-expr/src/physical_expr.rs      |  88 ++++++++++++
 datafusion/proto/src/logical_plan/mod.rs           |   2 +-
 11 files changed, 278 insertions(+), 247 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 79910869f0..f2c94e9bfa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1892,8 +1892,10 @@ dependencies = [
  "async-trait",
  "dashmap",
  "datafusion-common",
+ "datafusion-common-runtime",
  "datafusion-execution",
  "datafusion-expr",
+ "datafusion-physical-expr",
  "datafusion-physical-plan",
  "datafusion-sql",
  "futures",
diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml
index 4e1555b782..113d688253 100644
--- a/datafusion/catalog/Cargo.toml
+++ b/datafusion/catalog/Cargo.toml
@@ -35,8 +35,10 @@ arrow = { workspace = true }
 async-trait = { workspace = true }
 dashmap = { workspace = true }
 datafusion-common = { workspace = true }
+datafusion-common-runtime = { workspace = true }
 datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
+datafusion-physical-expr = { workspace = true }
 datafusion-physical-plan = { workspace = true }
 datafusion-sql = { workspace = true }
 futures = { workspace = true }
@@ -44,8 +46,6 @@ itertools = { workspace = true }
 log = { workspace = true }
 object_store = { workspace = true }
 parking_lot = { workspace = true }
-
-[dev-dependencies]
 tokio = { workspace = true }
 
 [lints]
diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs
index e4ccba8651..7ba97fbc9f 100644
--- a/datafusion/catalog/src/lib.rs
+++ b/datafusion/catalog/src/lib.rs
@@ -49,4 +49,6 @@ pub use r#async::*;
 pub use schema::*;
 pub use session::*;
 pub use table::*;
+pub mod stream;
 pub mod streaming;
+pub mod view;
diff --git a/datafusion/core/src/datasource/stream.rs 
b/datafusion/catalog/src/stream.rs
similarity index 98%
rename from datafusion/core/src/datasource/stream.rs
rename to datafusion/catalog/src/stream.rs
index ffb4860544..3fb6724907 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/catalog/src/stream.rs
@@ -25,9 +25,7 @@ use std::path::PathBuf;
 use std::str::FromStr;
 use std::sync::Arc;
 
-use crate::catalog::{TableProvider, TableProviderFactory};
-use crate::datasource::create_ordering;
-
+use crate::{Session, TableProvider, TableProviderFactory};
 use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, 
Result};
@@ -35,13 +33,13 @@ use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
+use datafusion_physical_expr::create_ordering;
 use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
 use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
 use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
 
 use async_trait::async_trait;
-use datafusion_catalog::Session;
 use futures::StreamExt;
 
 /// A [`TableProviderFactory`] for [`StreamTable`]
@@ -292,7 +290,7 @@ impl StreamConfig {
 /// data stored in object storage, should instead consider [`ListingTable`].
 ///
 /// [Hadoop]: https://hadoop.apache.org/
-/// [`ListingTable`]: crate::datasource::listing::ListingTable
+/// [`ListingTable`]: 
https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
 #[derive(Debug)]
 pub struct StreamTable(Arc<StreamConfig>);
 
diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs
new file mode 100644
index 0000000000..8dfb79718c
--- /dev/null
+++ b/datafusion/catalog/src/view.rs
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! View data source which uses a LogicalPlan as it's input.
+
+use std::{any::Any, borrow::Cow, sync::Arc};
+
+use crate::Session;
+use crate::TableProvider;
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::error::Result;
+use datafusion_common::Column;
+use datafusion_expr::TableType;
+use datafusion_expr::{Expr, LogicalPlan};
+use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
+use datafusion_physical_plan::ExecutionPlan;
+
+/// An implementation of `TableProvider` that uses another logical plan.
+#[derive(Debug)]
+pub struct ViewTable {
+    /// LogicalPlan of the view
+    logical_plan: LogicalPlan,
+    /// File fields + partition columns
+    table_schema: SchemaRef,
+    /// SQL used to create the view, if available
+    definition: Option<String>,
+}
+
+impl ViewTable {
+    /// Create new view that is executed at query runtime.
+    ///
+    /// Takes a `LogicalPlan` and optionally the SQL text of the `CREATE`
+    /// statement.
+    ///
+    /// Notes: the `LogicalPlan` is not validated or type coerced. If this is
+    /// needed it should be done after calling this function.
+    pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
+        let table_schema = logical_plan.schema().as_ref().to_owned().into();
+        Self {
+            logical_plan,
+            table_schema,
+            definition,
+        }
+    }
+
+    #[deprecated(
+        since = "47.0.0",
+        note = "Use `ViewTable::new` instead and apply TypeCoercion to the 
logical plan if needed"
+    )]
+    pub fn try_new(
+        logical_plan: LogicalPlan,
+        definition: Option<String>,
+    ) -> Result<Self> {
+        Ok(Self::new(logical_plan, definition))
+    }
+
+    /// Get definition ref
+    pub fn definition(&self) -> Option<&String> {
+        self.definition.as_ref()
+    }
+
+    /// Get logical_plan ref
+    pub fn logical_plan(&self) -> &LogicalPlan {
+        &self.logical_plan
+    }
+}
+
+#[async_trait]
+impl TableProvider for ViewTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
+        Some(Cow::Borrowed(&self.logical_plan))
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.table_schema)
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    fn get_table_definition(&self) -> Option<&str> {
+        self.definition.as_deref()
+    }
+    fn supports_filters_pushdown(
+        &self,
+        filters: &[&Expr],
+    ) -> Result<Vec<TableProviderFilterPushDown>> {
+        // A filter is added on the View when given
+        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
+    }
+
+    async fn scan(
+        &self,
+        state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
+        let plan = self.logical_plan().clone();
+        let mut plan = LogicalPlanBuilder::from(plan);
+
+        if let Some(filter) = filter {
+            plan = plan.filter(filter)?;
+        }
+
+        let mut plan = if let Some(projection) = projection {
+            // avoiding adding a redundant projection (e.g. SELECT * FROM view)
+            let current_projection =
+                (0..plan.schema().fields().len()).collect::<Vec<usize>>();
+            if projection == &current_projection {
+                plan
+            } else {
+                let fields: Vec<Expr> = projection
+                    .iter()
+                    .map(|i| {
+                        Expr::Column(Column::from(
+                            self.logical_plan.schema().qualified_field(*i),
+                        ))
+                    })
+                    .collect();
+                plan.project(fields)?
+            }
+        } else {
+            plan
+        };
+
+        if let Some(limit) = limit {
+            plan = plan.limit(0, Some(limit))?;
+        }
+
+        state.create_physical_plan(&plan.build()?).await
+    }
+}
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index 18a1318dd4..80783b4892 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -30,9 +30,10 @@ pub mod memory;
 pub mod physical_plan;
 pub mod provider;
 mod statistics;
-pub mod stream;
-pub mod view;
+mod view_test;
 
+pub use datafusion_catalog::stream;
+pub use datafusion_catalog::view;
 pub use datafusion_datasource::schema_adapter;
 pub use datafusion_datasource::source;
 
@@ -45,95 +46,9 @@ pub use self::view::ViewTable;
 pub use crate::catalog::TableProvider;
 pub use crate::logical_expr::TableType;
 pub use datafusion_execution::object_store;
+pub use datafusion_physical_expr::create_ordering;
 pub use statistics::get_statistics_with_limit;
 
-use arrow::compute::SortOptions;
-use arrow::datatypes::Schema;
-use datafusion_common::{plan_err, Result};
-use datafusion_expr::{Expr, SortExpr};
-use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};
-
-/// Converts logical sort expressions to physical sort expressions
-///
-/// This function transforms a collection of logical sort expressions into 
their physical
-/// representation that can be used during query execution.
-///
-/// # Arguments
-///
-/// * `schema` - The schema containing column definitions
-/// * `sort_order` - A collection of logical sort expressions grouped into 
lexicographic orderings
-///
-/// # Returns
-///
-/// A vector of lexicographic orderings for physical execution, or an error if 
the transformation fails
-///
-/// # Examples
-///
-/// ```
-/// // Create orderings from columns "id" and "name"
-/// # use arrow::datatypes::{Schema, Field, DataType};
-/// # use datafusion::datasource::create_ordering;
-/// # use datafusion_common::Column;
-/// # use datafusion_expr::{Expr, SortExpr};
-/// #
-/// // Create a schema with two fields
-/// let schema = Schema::new(vec![
-///     Field::new("id", DataType::Int32, false),
-///     Field::new("name", DataType::Utf8, false),
-/// ]);
-///
-/// let sort_exprs = vec![
-///     vec![
-///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: 
true, nulls_first: false }
-///     ],
-///     vec![
-///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), 
asc: false, nulls_first: true }
-///     ]
-/// ];
-/// let result = create_ordering(&schema, &sort_exprs).unwrap();
-/// ```
-pub fn create_ordering(
-    schema: &Schema,
-    sort_order: &[Vec<SortExpr>],
-) -> Result<Vec<LexOrdering>> {
-    let mut all_sort_orders = vec![];
-
-    for (group_idx, exprs) in sort_order.iter().enumerate() {
-        // Construct PhysicalSortExpr objects from Expr objects:
-        let mut sort_exprs = LexOrdering::default();
-        for (expr_idx, sort) in exprs.iter().enumerate() {
-            match &sort.expr {
-                Expr::Column(col) => match expressions::col(&col.name, schema) 
{
-                    Ok(expr) => {
-                        sort_exprs.push(PhysicalSortExpr {
-                            expr,
-                            options: SortOptions {
-                                descending: !sort.asc,
-                                nulls_first: sort.nulls_first,
-                            },
-                        });
-                    }
-                    // Cannot find expression in the projected_schema, stop 
iterating
-                    // since rest of the orderings are violated
-                    Err(_) => break,
-                },
-                expr => {
-                    return plan_err!(
-                        "Expected single column reference in 
sort_order[{}][{}], got {}",
-                        group_idx,
-                        expr_idx,
-                        expr
-                    );
-                }
-            }
-        }
-        if !sort_exprs.is_empty() {
-            all_sort_orders.push(sort_exprs);
-        }
-    }
-    Ok(all_sort_orders)
-}
-
 #[cfg(all(test, feature = "parquet"))]
 mod tests {
 
diff --git a/datafusion/core/src/datasource/view.rs 
b/datafusion/core/src/datasource/view_test.rs
similarity index 78%
rename from datafusion/core/src/datasource/view.rs
rename to datafusion/core/src/datasource/view_test.rs
index e4f57b0d97..c3dd5a2dd9 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view_test.rs
@@ -17,157 +17,14 @@
 
 //! View data source which uses a LogicalPlan as it's input.
 
-use std::{any::Any, borrow::Cow, sync::Arc};
-
-use crate::{
-    error::Result,
-    logical_expr::{Expr, LogicalPlan},
-    physical_plan::ExecutionPlan,
-};
-use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
-use datafusion_catalog::Session;
-use datafusion_common::config::ConfigOptions;
-use datafusion_common::Column;
-use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
-use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
-use datafusion_optimizer::Analyzer;
-
-use crate::datasource::{TableProvider, TableType};
-
-/// An implementation of `TableProvider` that uses another logical plan.
-#[derive(Debug)]
-pub struct ViewTable {
-    /// LogicalPlan of the view
-    logical_plan: LogicalPlan,
-    /// File fields + partition columns
-    table_schema: SchemaRef,
-    /// SQL used to create the view, if available
-    definition: Option<String>,
-}
-
-impl ViewTable {
-    /// Create new view that is executed at query runtime.
-    /// Takes a `LogicalPlan` and an optional create statement as input.
-    pub fn try_new(
-        logical_plan: LogicalPlan,
-        definition: Option<String>,
-    ) -> Result<Self> {
-        let logical_plan = Self::apply_required_rule(logical_plan)?;
-        let table_schema = logical_plan.schema().as_ref().to_owned().into();
-
-        let view = Self {
-            logical_plan,
-            table_schema,
-            definition,
-        };
-
-        Ok(view)
-    }
-
-    fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
-        let options = ConfigOptions::default();
-        
Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]).execute_and_check(
-            logical_plan,
-            &options,
-            |_, _| {},
-        )
-    }
-
-    /// Get definition ref
-    pub fn definition(&self) -> Option<&String> {
-        self.definition.as_ref()
-    }
-
-    /// Get logical_plan ref
-    pub fn logical_plan(&self) -> &LogicalPlan {
-        &self.logical_plan
-    }
-}
-
-#[async_trait]
-impl TableProvider for ViewTable {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
-        Some(Cow::Borrowed(&self.logical_plan))
-    }
-
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.table_schema)
-    }
-
-    fn table_type(&self) -> TableType {
-        TableType::View
-    }
-
-    fn get_table_definition(&self) -> Option<&str> {
-        self.definition.as_deref()
-    }
-    fn supports_filters_pushdown(
-        &self,
-        filters: &[&Expr],
-    ) -> Result<Vec<TableProviderFilterPushDown>> {
-        // A filter is added on the View when given
-        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
-    }
-
-    async fn scan(
-        &self,
-        state: &dyn Session,
-        projection: Option<&Vec<usize>>,
-        filters: &[Expr],
-        limit: Option<usize>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
-        let plan = self.logical_plan().clone();
-        let mut plan = LogicalPlanBuilder::from(plan);
-
-        if let Some(filter) = filter {
-            plan = plan.filter(filter)?;
-        }
-
-        let mut plan = if let Some(projection) = projection {
-            // avoiding adding a redundant projection (e.g. SELECT * FROM view)
-            let current_projection =
-                (0..plan.schema().fields().len()).collect::<Vec<usize>>();
-            if projection == &current_projection {
-                plan
-            } else {
-                let fields: Vec<Expr> = projection
-                    .iter()
-                    .map(|i| {
-                        Expr::Column(Column::from(
-                            self.logical_plan.schema().qualified_field(*i),
-                        ))
-                    })
-                    .collect();
-                plan.project(fields)?
-            }
-        } else {
-            plan
-        };
-
-        if let Some(limit) = limit {
-            plan = plan.limit(0, Some(limit))?;
-        }
-
-        state.create_physical_plan(&plan.build()?).await
-    }
-}
-
 #[cfg(test)]
 mod tests {
-    use datafusion_expr::{col, lit};
-
+    use crate::error::Result;
     use crate::execution::options::ParquetReadOptions;
     use crate::prelude::SessionContext;
     use crate::test_util::parquet_test_data;
     use crate::{assert_batches_eq, execution::context::SessionConfig};
-
-    use super::*;
+    use datafusion_expr::{col, lit};
 
     #[tokio::test]
     async fn issue_3242() -> Result<()> {
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index beefca6d57..faf689179e 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -75,9 +75,12 @@ use chrono::{DateTime, Utc};
 use datafusion_catalog::{
     DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, 
UrlTableFactory,
 };
+use datafusion_common::config::ConfigOptions;
 pub use datafusion_execution::config::SessionConfig;
 pub use datafusion_execution::TaskContext;
 pub use datafusion_expr::execution_props::ExecutionProps;
+use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
+use datafusion_optimizer::Analyzer;
 use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
 use object_store::ObjectStore;
 use parking_lot::RwLock;
@@ -856,6 +859,16 @@ impl SessionContext {
         }
     }
 
+    /// Applies the `TypeCoercion` rewriter to the logical plan.
+    fn apply_type_coercion(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
+        let options = ConfigOptions::default();
+        
Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]).execute_and_check(
+            logical_plan,
+            &options,
+            |_, _| {},
+        )
+    }
+
     async fn create_view(&self, cmd: CreateView) -> Result<DataFrame> {
         let CreateView {
             name,
@@ -874,13 +887,14 @@ impl SessionContext {
         match (or_replace, view) {
             (true, Ok(_)) => {
                 self.deregister_table(name.clone())?;
-                let table = Arc::new(ViewTable::try_new((*input).clone(), 
definition)?);
-
+                let input = Self::apply_type_coercion(input.as_ref().clone())?;
+                let table = Arc::new(ViewTable::new(input, definition));
                 self.register_table(name, table)?;
                 self.return_empty_dataframe()
             }
             (_, Err(_)) => {
-                let table = Arc::new(ViewTable::try_new((*input).clone(), 
definition)?);
+                let input = Self::apply_type_coercion(input.as_ref().clone())?;
+                let table = Arc::new(ViewTable::new(input, definition));
                 self.register_table(name, table)?;
                 self.return_empty_dataframe()
             }
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index e52c686cd9..9abaeae440 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -54,8 +54,8 @@ pub use equivalence::{
 };
 pub use partitioning::{Distribution, Partitioning};
 pub use physical_expr::{
-    physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
-    PhysicalExprRef,
+    create_ordering, physical_exprs_bag_equal, physical_exprs_contains,
+    physical_exprs_equal, PhysicalExprRef,
 };
 
 pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
diff --git a/datafusion/physical-expr/src/physical_expr.rs 
b/datafusion/physical-expr/src/physical_expr.rs
index a4184845a0..2221bc980f 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -58,6 +58,94 @@ pub fn physical_exprs_bag_equal(
     multi_set_lhs == multi_set_rhs
 }
 
+use crate::{expressions, LexOrdering, PhysicalSortExpr};
+use arrow::compute::SortOptions;
+use arrow::datatypes::Schema;
+use datafusion_common::plan_err;
+use datafusion_common::Result;
+use datafusion_expr::{Expr, SortExpr};
+
+/// Converts logical sort expressions to physical sort expressions
+///
+/// This function transforms a collection of logical sort expressions into 
their physical
+/// representation that can be used during query execution.
+///
+/// # Arguments
+///
+/// * `schema` - The schema containing column definitions
+/// * `sort_order` - A collection of logical sort expressions grouped into 
lexicographic orderings
+///
+/// # Returns
+///
+/// A vector of lexicographic orderings for physical execution, or an error if 
the transformation fails
+///
+/// # Examples
+///
+/// ```
+/// // Create orderings from columns "id" and "name"
+/// # use arrow::datatypes::{Schema, Field, DataType};
+/// # use datafusion_physical_expr::create_ordering;
+/// # use datafusion_common::Column;
+/// # use datafusion_expr::{Expr, SortExpr};
+/// #
+/// // Create a schema with two fields
+/// let schema = Schema::new(vec![
+///     Field::new("id", DataType::Int32, false),
+///     Field::new("name", DataType::Utf8, false),
+/// ]);
+///
+/// let sort_exprs = vec![
+///     vec![
+///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: 
true, nulls_first: false }
+///     ],
+///     vec![
+///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), 
asc: false, nulls_first: true }
+///     ]
+/// ];
+/// let result = create_ordering(&schema, &sort_exprs).unwrap();
+/// ```
+pub fn create_ordering(
+    schema: &Schema,
+    sort_order: &[Vec<SortExpr>],
+) -> Result<Vec<LexOrdering>> {
+    let mut all_sort_orders = vec![];
+
+    for (group_idx, exprs) in sort_order.iter().enumerate() {
+        // Construct PhysicalSortExpr objects from Expr objects:
+        let mut sort_exprs = LexOrdering::default();
+        for (expr_idx, sort) in exprs.iter().enumerate() {
+            match &sort.expr {
+                Expr::Column(col) => match expressions::col(&col.name, schema) 
{
+                    Ok(expr) => {
+                        sort_exprs.push(PhysicalSortExpr {
+                            expr,
+                            options: SortOptions {
+                                descending: !sort.asc,
+                                nulls_first: sort.nulls_first,
+                            },
+                        });
+                    }
+                    // Cannot find expression in the projected_schema, stop 
iterating
+                    // since rest of the orderings are violated
+                    Err(_) => break,
+                },
+                expr => {
+                    return plan_err!(
+                        "Expected single column reference in 
sort_order[{}][{}], got {}",
+                        group_idx,
+                        expr_idx,
+                        expr
+                    );
+                }
+            }
+        }
+        if !sort_exprs.is_empty() {
+            all_sort_orders.push(sort_exprs);
+        }
+    }
+    Ok(all_sort_orders)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 148856cd10..c65569ef1c 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -867,7 +867,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     None
                 };
 
-                let provider = ViewTable::try_new(input, definition)?;
+                let provider = ViewTable::new(input, definition);
 
                 let table_name =
                     from_table_reference(scan.table_name.as_ref(), 
"ViewScan")?;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to