This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new cb84504fe `LogicalPlanBuilder` now uses `TableSource` instead of
`TableProvider` (#2569)
cb84504fe is described below
commit cb84504fed4e613c9ed18c4e2a2022c701add2d9
Author: Andy Grove <[email protected]>
AuthorDate: Sat May 21 05:15:43 2022 -0600
`LogicalPlanBuilder` now uses `TableSource` instead of `TableProvider`
(#2569)
* add scan_empty method to tests
* update tests to use new scan_empty test method
* remove LogicalPlanBuilder::scan_empty
* LogicalPlanBuilder now uses TableSource instead of TableProvider
---
ballista/rust/core/src/serde/logical_plan/mod.rs | 8 ++++----
datafusion-examples/examples/custom_datasource.rs | 16 ++++++++++------
datafusion/core/src/execution/context.rs | 15 +++++++++------
datafusion/core/src/logical_plan/builder.rs | 16 ++++++++--------
datafusion/core/src/sql/planner.rs | 16 ++++++++++------
datafusion/core/src/test_util.rs | 8 ++++++--
datafusion/core/tests/parquet_pruning.rs | 17 +++++++++++------
datafusion/core/tests/sql/projection.rs | 9 +++++----
8 files changed, 63 insertions(+), 42 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 070dad98a..f088f2f1f 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias,
Window,
};
use datafusion::logical_plan::{
- source_as_provider, Column, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
- CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
LogicalPlanBuilder,
- Offset, Repartition, TableScan, Values,
+ provider_as_source, source_as_provider, Column, CreateCatalog,
CreateCatalogSchema,
+ CreateExternalTable, CreateView, CrossJoin, Expr, JoinConstraint, Limit,
LogicalPlan,
+ LogicalPlanBuilder, Offset, Repartition, TableScan, Values,
};
use datafusion::prelude::SessionContext;
@@ -252,7 +252,7 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
- Arc::new(provider),
+ provider_as_source(Arc::new(provider)),
projection,
filters,
)?
diff --git a/datafusion-examples/examples/custom_datasource.rs
b/datafusion-examples/examples/custom_datasource.rs
index a9a8ef7aa..a814e585e 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -23,7 +23,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
-use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
+use datafusion::logical_plan::{provider_as_source, Expr, LogicalPlanBuilder};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
@@ -60,11 +60,15 @@ async fn search_accounts(
let ctx = SessionContext::new();
// create logical plan composed of a single TableScan
- let logical_plan =
- LogicalPlanBuilder::scan_with_filters("accounts", Arc::new(db), None,
vec![])
- .unwrap()
- .build()
- .unwrap();
+ let logical_plan = LogicalPlanBuilder::scan_with_filters(
+ "accounts",
+ provider_as_source(Arc::new(db)),
+ None,
+ vec![],
+ )
+ .unwrap()
+ .build()
+ .unwrap();
let mut dataframe = DataFrame::new(ctx.state, &logical_plan)
.select_columns(&["id", "bank_account"])?;
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index ca3bca61d..629adf137 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -61,9 +61,9 @@ use crate::datasource::listing::ListingTableConfig;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
- CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
- CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder,
- UNNAMED_TABLE,
+ provider_as_source, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
+ CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry,
LogicalPlan,
+ LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
@@ -586,7 +586,9 @@ impl SessionContext {
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
- let plan = LogicalPlanBuilder::scan(path, Arc::new(provider),
None)?.build()?;
+ let plan =
+ LogicalPlanBuilder::scan(path,
provider_as_source(Arc::new(provider)), None)?
+ .build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
@@ -620,7 +622,8 @@ impl SessionContext {
pub fn read_table(&self, provider: Arc<dyn TableProvider>) ->
Result<Arc<DataFrame>> {
Ok(Arc::new(DataFrame::new(
self.state.clone(),
- &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?.build()?,
+ &LogicalPlanBuilder::scan(UNNAMED_TABLE,
provider_as_source(provider), None)?
+ .build()?,
)))
}
@@ -817,7 +820,7 @@ impl SessionContext {
Some(ref provider) => {
let plan = LogicalPlanBuilder::scan(
table_ref.table(),
- Arc::clone(provider),
+ provider_as_source(Arc::clone(provider)),
None,
)?
.build()?;
diff --git a/datafusion/core/src/logical_plan/builder.rs
b/datafusion/core/src/logical_plan/builder.rs
index f8b1fefdd..cad1b0a6a 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -17,7 +17,6 @@
//! This module provides a builder for creating LogicalPlans
-use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_expr::ExprSchemable;
use crate::logical_plan::plan::{
@@ -41,11 +40,12 @@ use std::{
use super::{Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use crate::logical_plan::{
columnize_expr, exprlist_to_fields, normalize_col, normalize_cols,
- provider_as_source, rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField,
DFSchema,
- DFSchemaRef, Limit, Offset, Partitioning, Repartition, Values,
+ rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema,
DFSchemaRef, Limit,
+ Offset, Partitioning, Repartition, Values,
};
use datafusion_common::ToDFSchema;
+use datafusion_expr::TableSource;
/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";
@@ -191,16 +191,16 @@ impl LogicalPlanBuilder {
/// Convert a table provider into a builder with a TableScan
pub fn scan(
table_name: impl Into<String>,
- provider: Arc<dyn TableProvider>,
+ table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
) -> Result<Self> {
- Self::scan_with_filters(table_name, provider, projection, vec![])
+ Self::scan_with_filters(table_name, table_source, projection, vec![])
}
/// Convert a table provider into a builder with a TableScan
pub fn scan_with_filters(
table_name: impl Into<String>,
- provider: Arc<dyn TableProvider>,
+ table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
) -> Result<Self> {
@@ -212,7 +212,7 @@ impl LogicalPlanBuilder {
));
}
- let schema = provider.schema();
+ let schema = table_source.schema();
let projected_schema = projection
.as_ref()
@@ -232,7 +232,7 @@ impl LogicalPlanBuilder {
let table_scan = LogicalPlan::TableScan(TableScan {
table_name,
- source: provider_as_source(provider),
+ source: table_source,
projected_schema: Arc::new(projected_schema),
projection,
filters,
diff --git a/datafusion/core/src/sql/planner.rs
b/datafusion/core/src/sql/planner.rs
index 9f96097ea..5113c9f5b 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -28,10 +28,11 @@ use crate::datasource::TableProvider;
use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
- and, col, lit, normalize_col, normalize_col_with_schemas, Column,
CreateCatalog,
- CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
- CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr,
FileType,
- LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
ToStringifiedPlan,
+ and, col, lit, normalize_col, normalize_col_with_schemas,
provider_as_source,
+ union_with_alias, Column, CreateCatalog, CreateCatalogSchema,
+ CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable,
CreateView,
+ DFSchema, DFSchemaRef, DropTable, Expr, FileType, LogicalPlan,
LogicalPlanBuilder,
+ Operator, PlanType, ToDFSchema, ToStringifiedPlan,
};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
@@ -714,8 +715,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
_ => Ok(cte_plan.clone()),
},
(_, Ok(provider)) => {
- let scan =
- LogicalPlanBuilder::scan(&table_name,
provider, None);
+ let scan = LogicalPlanBuilder::scan(
+ &table_name,
+ provider_as_source(provider),
+ None,
+ );
let scan = match table_alias.as_ref() {
Some(ref name) =>
scan?.alias(name.to_owned().as_str()),
_ => scan,
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 3e174e9dc..1a6a5028e 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -21,7 +21,7 @@ use std::collections::BTreeMap;
use std::{env, error::Error, path::PathBuf, sync::Arc};
use crate::datasource::empty::EmptyTable;
-use crate::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE};
+use crate::logical_plan::{provider_as_source, LogicalPlanBuilder,
UNNAMED_TABLE};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::DataFusionError;
@@ -243,7 +243,11 @@ pub fn scan_empty(
) -> Result<LogicalPlanBuilder, DataFusionError> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema));
- LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), provider,
projection)
+ LogicalPlanBuilder::scan(
+ name.unwrap_or(UNNAMED_TABLE),
+ provider_as_source(provider),
+ projection,
+ )
}
/// Get the schema for the aggregate_test_* csv files
diff --git a/datafusion/core/tests/parquet_pruning.rs
b/datafusion/core/tests/parquet_pruning.rs
index 0d580f2d2..7e7caa959 100644
--- a/datafusion/core/tests/parquet_pruning.rs
+++ b/datafusion/core/tests/parquet_pruning.rs
@@ -30,6 +30,7 @@ use arrow::{
util::pretty::pretty_format_batches,
};
use chrono::{Datelike, Duration};
+use datafusion::logical_plan::provider_as_source;
use datafusion::{
datasource::TableProvider,
logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder},
@@ -544,12 +545,16 @@ impl ContextWithParquet {
/// the number of output rows and normalized execution metrics
async fn query_with_expr(&mut self, expr: Expr) -> TestOutput {
let sql = format!("EXPR only: {:?}", expr);
- let logical_plan = LogicalPlanBuilder::scan("t",
self.provider.clone(), None)
- .unwrap()
- .filter(expr)
- .unwrap()
- .build()
- .unwrap();
+ let logical_plan = LogicalPlanBuilder::scan(
+ "t",
+ provider_as_source(self.provider.clone()),
+ None,
+ )
+ .unwrap()
+ .filter(expr)
+ .unwrap()
+ .build()
+ .unwrap();
self.run_test(logical_plan, sql).await
}
diff --git a/datafusion/core/tests/sql/projection.rs
b/datafusion/core/tests/sql/projection.rs
index e1b1742bf..c74445bfd 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE};
+use datafusion::logical_plan::{provider_as_source, LogicalPlanBuilder,
UNNAMED_TABLE};
use datafusion::test_util::scan_empty;
use tempfile::TempDir;
@@ -239,9 +239,10 @@ async fn projection_on_memory_scan() -> Result<()> {
)?]];
let provider = Arc::new(MemTable::try_new(schema, partitions)?);
- let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?
- .project(vec![col("b")])?
- .build()?;
+ let plan =
+ LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider),
None)?
+ .project(vec![col("b")])?
+ .build()?;
assert_fields_eq(&plan, vec!["b"]);
let ctx = SessionContext::new();