This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d6d1ce  Implement TableProvider for DataFrameImpl (#1699)
0d6d1ce is described below

commit 0d6d1ce5133c7bac9cf313e72f5a529aaefc766c
Author: Phillip Cloud <[email protected]>
AuthorDate: Sun Jan 30 05:42:18 2022 -0500

    Implement TableProvider for DataFrameImpl (#1699)
    
    * Add TableProvider impl for DataFrameImpl
    
    * Add physical plan in
    
    * Clean up plan construction and names construction
    
    * Remove duplicate comments
    
    * Remove unused parameter
    
    * Add test
    
    * Remove duplicate limit comment
    
    * Use cloned instead of individual clone
    
    * Reduce the amount of code to get a schema
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Add comments to test
    
    * Fix plan comparison
    
    * Compare only the results of execution
    
    * Remove println
    
    * Refer to df_impl instead of table in test
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Fix the register_table test to use the correct result set for comparison
    
    * Consolidate group/agg exprs
    
    * Format
    
    * Remove outdated comment
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/src/execution/dataframe_impl.rs | 114 +++++++++++++++++++++++++++++
 1 file changed, 114 insertions(+)

diff --git a/datafusion/src/execution/dataframe_impl.rs 
b/datafusion/src/execution/dataframe_impl.rs
index f2d0385..d3f62bb 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -17,8 +17,11 @@
 
 //! Implementation of DataFrame API.
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 
+use crate::arrow::datatypes::Schema;
+use crate::arrow::datatypes::SchemaRef;
 use crate::arrow::record_batch::RecordBatch;
 use crate::error::Result;
 use crate::execution::context::{ExecutionContext, ExecutionContextState};
@@ -26,12 +29,15 @@ use crate::logical_plan::{
     col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, 
LogicalPlanBuilder,
     Partitioning,
 };
+use crate::scalar::ScalarValue;
 use crate::{
     dataframe::*,
     physical_plan::{collect, collect_partitioned},
 };
 
 use crate::arrow::util::pretty;
+use crate::datasource::TableProvider;
+use crate::datasource::TableType;
 use crate::physical_plan::{
     execute_stream, execute_stream_partitioned, ExecutionPlan, 
SendableRecordBatchStream,
 };
@@ -63,6 +69,59 @@ impl DataFrameImpl {
 }
 
 #[async_trait]
+impl TableProvider for DataFrameImpl {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        let schema: Schema = self.plan.schema().as_ref().into();
+        Arc::new(schema)
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    async fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let expr = projection
+            .as_ref()
+            // construct projections
+            .map_or_else(
+                || Ok(Arc::new(Self::new(self.ctx_state.clone(), &self.plan)) 
as Arc<_>),
+                |projection| {
+                    let schema = 
TableProvider::schema(self).project(projection)?;
+                    let names = schema
+                        .fields()
+                        .iter()
+                        .map(|field| field.name().as_str())
+                        .collect::<Vec<_>>();
+                    self.select_columns(names.as_slice())
+                },
+            )?
+            // add predicates, otherwise use `true` as the predicate
+            .filter(filters.iter().cloned().fold(
+                Expr::Literal(ScalarValue::Boolean(Some(true))),
+                |acc, new| acc.and(new),
+            ))?;
+        // add a limit if given
+        Self::new(
+            self.ctx_state.clone(),
+            &limit
+                .map_or_else(|| Ok(expr.clone()), |n| expr.limit(n))?
+                .to_logical_plan(),
+        )
+        .create_physical_plan()
+        .await
+    }
+}
+
+#[async_trait]
 impl DataFrame for DataFrameImpl {
     /// Apply a projection based on a list of column names
     fn select_columns(&self, columns: &[&str]) -> Result<Arc<dyn DataFrame>> {
@@ -488,6 +547,61 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn register_table() -> Result<()> {
+        let df = test_table().await?.select_columns(&["c1", "c12"])?;
+        let mut ctx = ExecutionContext::new();
+        let df_impl =
+            Arc::new(DataFrameImpl::new(ctx.state.clone(), 
&df.to_logical_plan()));
+
+        // register a dataframe as a table
+        ctx.register_table("test_table", df_impl.clone())?;
+
+        // pull the table out
+        let table = ctx.table("test_table")?;
+
+        let group_expr = vec![col("c1")];
+        let aggr_expr = vec![sum(col("c12"))];
+
+        // check that we correctly read from the table
+        let df_results = &df_impl
+            .aggregate(group_expr.clone(), aggr_expr.clone())?
+            .collect()
+            .await?;
+        let table_results = &table.aggregate(group_expr, 
aggr_expr)?.collect().await?;
+
+        assert_batches_sorted_eq!(
+            vec![
+                "+----+-----------------------------+",
+                "| c1 | SUM(aggregate_test_100.c12) |",
+                "+----+-----------------------------+",
+                "| a  | 10.238448667882977          |",
+                "| b  | 7.797734760124923           |",
+                "| c  | 13.860958726523545          |",
+                "| d  | 8.793968289758968           |",
+                "| e  | 10.206140546981722          |",
+                "+----+-----------------------------+",
+            ],
+            df_results
+        );
+
+        // the results are the same as the results from the view, modulo the 
leaf table name
+        assert_batches_sorted_eq!(
+            vec![
+                "+----+---------------------+",
+                "| c1 | SUM(test_table.c12) |",
+                "+----+---------------------+",
+                "| a  | 10.238448667882977  |",
+                "| b  | 7.797734760124923   |",
+                "| c  | 13.860958726523545  |",
+                "| d  | 8.793968289758968   |",
+                "| e  | 10.206140546981722  |",
+                "+----+---------------------+",
+            ],
+            table_results
+        );
+        Ok(())
+    }
     /// Compare the formatted string representation of two plans for equality
     fn assert_same_plan(plan1: &LogicalPlan, plan2: &LogicalPlan) {
         assert_eq!(format!("{:?}", plan1), format!("{:?}", plan2));

Reply via email to