This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0d6d1ce Implement TableProvider for DataFrameImpl (#1699)
0d6d1ce is described below
commit 0d6d1ce5133c7bac9cf313e72f5a529aaefc766c
Author: Phillip Cloud <[email protected]>
AuthorDate: Sun Jan 30 05:42:18 2022 -0500
Implement TableProvider for DataFrameImpl (#1699)
* Add TableProvider impl for DataFrameImpl
* Add physical plan in
* Clean up plan construction and names construction
* Remove duplicate comments
* Remove unused parameter
* Add test
* Remove duplicate limit comment
* Use cloned instead of individual clone
* Reduce the amount of code to get a schema
Co-authored-by: Andrew Lamb <[email protected]>
* Add comments to test
* Fix plan comparison
* Compare only the results of execution
* Remove println
* Refer to df_impl instead of table in test
Co-authored-by: Andrew Lamb <[email protected]>
* Fix the register_table test to use the correct result set for comparison
* Consolidate group/agg exprs
* Format
* Remove outdated comment
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/src/execution/dataframe_impl.rs | 114 +++++++++++++++++++++++++++++
1 file changed, 114 insertions(+)
diff --git a/datafusion/src/execution/dataframe_impl.rs
b/datafusion/src/execution/dataframe_impl.rs
index f2d0385..d3f62bb 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -17,8 +17,11 @@
//! Implementation of DataFrame API.
+use std::any::Any;
use std::sync::{Arc, Mutex};
+use crate::arrow::datatypes::Schema;
+use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
@@ -26,12 +29,15 @@ use crate::logical_plan::{
col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan,
LogicalPlanBuilder,
Partitioning,
};
+use crate::scalar::ScalarValue;
use crate::{
dataframe::*,
physical_plan::{collect, collect_partitioned},
};
use crate::arrow::util::pretty;
+use crate::datasource::TableProvider;
+use crate::datasource::TableType;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan,
SendableRecordBatchStream,
};
@@ -63,6 +69,59 @@ impl DataFrameImpl {
}
#[async_trait]
+impl TableProvider for DataFrameImpl {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ let schema: Schema = self.plan.schema().as_ref().into();
+ Arc::new(schema)
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ projection: &Option<Vec<usize>>,
+ filters: &[Expr],
+ limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let expr = projection
+ .as_ref()
+ // construct projections
+ .map_or_else(
+ || Ok(Arc::new(Self::new(self.ctx_state.clone(), &self.plan))
as Arc<_>),
+ |projection| {
+ let schema =
TableProvider::schema(self).project(projection)?;
+ let names = schema
+ .fields()
+ .iter()
+ .map(|field| field.name().as_str())
+ .collect::<Vec<_>>();
+ self.select_columns(names.as_slice())
+ },
+ )?
+ // add predicates, otherwise use `true` as the predicate
+ .filter(filters.iter().cloned().fold(
+ Expr::Literal(ScalarValue::Boolean(Some(true))),
+ |acc, new| acc.and(new),
+ ))?;
+ // add a limit if given
+ Self::new(
+ self.ctx_state.clone(),
+ &limit
+ .map_or_else(|| Ok(expr.clone()), |n| expr.limit(n))?
+ .to_logical_plan(),
+ )
+ .create_physical_plan()
+ .await
+ }
+}
+
+#[async_trait]
impl DataFrame for DataFrameImpl {
/// Apply a projection based on a list of column names
fn select_columns(&self, columns: &[&str]) -> Result<Arc<dyn DataFrame>> {
@@ -488,6 +547,61 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn register_table() -> Result<()> {
+ let df = test_table().await?.select_columns(&["c1", "c12"])?;
+ let mut ctx = ExecutionContext::new();
+ let df_impl =
+ Arc::new(DataFrameImpl::new(ctx.state.clone(),
&df.to_logical_plan()));
+
+ // register a dataframe as a table
+ ctx.register_table("test_table", df_impl.clone())?;
+
+ // pull the table out
+ let table = ctx.table("test_table")?;
+
+ let group_expr = vec![col("c1")];
+ let aggr_expr = vec![sum(col("c12"))];
+
+ // check that we correctly read from the table
+ let df_results = &df_impl
+ .aggregate(group_expr.clone(), aggr_expr.clone())?
+ .collect()
+ .await?;
+ let table_results = &table.aggregate(group_expr,
aggr_expr)?.collect().await?;
+
+ assert_batches_sorted_eq!(
+ vec![
+ "+----+-----------------------------+",
+ "| c1 | SUM(aggregate_test_100.c12) |",
+ "+----+-----------------------------+",
+ "| a | 10.238448667882977 |",
+ "| b | 7.797734760124923 |",
+ "| c | 13.860958726523545 |",
+ "| d | 8.793968289758968 |",
+ "| e | 10.206140546981722 |",
+ "+----+-----------------------------+",
+ ],
+ df_results
+ );
+
+ // the results are the same as the results from the view, modulo the
leaf table name
+ assert_batches_sorted_eq!(
+ vec![
+ "+----+---------------------+",
+ "| c1 | SUM(test_table.c12) |",
+ "+----+---------------------+",
+ "| a | 10.238448667882977 |",
+ "| b | 7.797734760124923 |",
+ "| c | 13.860958726523545 |",
+ "| d | 8.793968289758968 |",
+ "| e | 10.206140546981722 |",
+ "+----+---------------------+",
+ ],
+ table_results
+ );
+ Ok(())
+ }
/// Compare the formatted string representation of two plans for equality
fn assert_same_plan(plan1: &LogicalPlan, plan2: &LogicalPlan) {
assert_eq!(format!("{:?}", plan1), format!("{:?}", plan2));