This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 217f8c8  ARROW-5946: [Rust] [DataFusion] Fix bug in projection push 
down logic
217f8c8 is described below

commit 217f8c86ddf394aacf61bce7ccd45d875f86b964
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jul 15 07:47:46 2019 -0600

    ARROW-5946: [Rust] [DataFusion] Fix bug in projection push down logic
    
    There was a dumb bug when deriving the projected schema. The following code 
was used:
    
    ```
    for i in in 0..projection.len()
    ```
    
    This should have been
    
    ```
    for i in in 0..&projection
    ```
    
    This PR modifies the `TableScan` variant of `LogicalPlan` to store both the 
underlying table schema, and the projected schema. I added a unit test to 
demonstrate that the bug is now fixed.
    
    Author: Andy Grove <[email protected]>
    
    Closes #4878 from andygrove/ARROW-5946 and squashes the following commits:
    
    8ac74c81d <Andy Grove> remove println
    a992c1703 <Andy Grove> Fix bug in projection push down
---
 rust/datafusion/src/execution/context.rs           |  3 +-
 rust/datafusion/src/logicalplan.rs                 | 13 ++++--
 .../src/optimizer/projection_push_down.rs          | 37 ++++++++++-----
 rust/datafusion/src/sql/parser.rs                  |  2 -
 rust/datafusion/src/sql/planner.rs                 |  4 +-
 rust/datafusion/tests/sql.rs                       | 52 ++++++++++++++++++++++
 testing                                            |  2 +-
 7 files changed, 91 insertions(+), 22 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index 1a31b9a..064b3e4 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -172,7 +172,8 @@ impl ExecutionContext {
                 Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::TableScan {
                     schema_name: "".to_string(),
                     table_name: table_name.to_string(),
-                    schema: provider.schema().clone(),
+                    table_schema: provider.schema().clone(),
+                    projected_schema: provider.schema().clone(),
                     projection: None,
                 }))))
             }
diff --git a/rust/datafusion/src/logicalplan.rs 
b/rust/datafusion/src/logicalplan.rs
index ca49789..cc24149 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -423,8 +423,10 @@ pub enum LogicalPlan {
         schema_name: String,
         /// The name of the table
         table_name: String,
-        /// The schema description
-        schema: Arc<Schema>,
+        /// The underlying table schema
+        table_schema: Arc<Schema>,
+        /// The projected schema
+        projected_schema: Arc<Schema>,
         /// Optional column indices to use as a projection
         projection: Option<Vec<usize>>,
     },
@@ -462,7 +464,9 @@ impl LogicalPlan {
     pub fn schema(&self) -> &Arc<Schema> {
         match self {
             LogicalPlan::EmptyRelation { schema } => &schema,
-            LogicalPlan::TableScan { schema, .. } => &schema,
+            LogicalPlan::TableScan {
+                projected_schema, ..
+            } => &projected_schema,
             LogicalPlan::Projection { schema, .. } => &schema,
             LogicalPlan::Selection { input, .. } => input.schema(),
             LogicalPlan::Aggregate { schema, .. } => &schema,
@@ -622,7 +626,8 @@ mod tests {
         let plan = Arc::new(LogicalPlan::TableScan {
             schema_name: "".to_string(),
             table_name: "people".to_string(),
-            schema: Arc::new(schema),
+            table_schema: Arc::new(schema.clone()),
+            projected_schema: Arc::new(schema),
             projection: Some(vec![0, 1, 4]),
         });
 
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs 
b/rust/datafusion/src/optimizer/projection_push_down.rs
index 9f9534b..6bc35da 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -139,9 +139,16 @@ impl ProjectionPushDown {
             LogicalPlan::TableScan {
                 schema_name,
                 table_name,
-                schema,
+                table_schema,
+                projection,
                 ..
             } => {
+                if projection.is_some() {
+                    return Err(ExecutionError::General(
+                        "Cannot run projection push-down rule more than 
once".to_string(),
+                    ));
+                }
+
                 // once we reach the table scan, we can use the accumulated 
set of column
                 // indexes as the projection in the table scan
                 let mut projection: Vec<usize> = 
Vec::with_capacity(accum.len());
@@ -153,8 +160,8 @@ impl ProjectionPushDown {
                 // create the projected schema
                 let mut projected_fields: Vec<Field> =
                     Vec::with_capacity(projection.len());
-                for i in 0..projection.len() {
-                    projected_fields.push(schema.fields()[i].clone());
+                for i in &projection {
+                    projected_fields.push(table_schema.fields()[*i].clone());
                 }
                 let projected_schema = Schema::new(projected_fields);
 
@@ -169,7 +176,7 @@ impl ProjectionPushDown {
                     ));
                 }
 
-                for i in 0..schema.fields().len() {
+                for i in 0..table_schema.fields().len() {
                     if let Some(n) = projection.iter().position(|v| *v == i) {
                         mapping.insert(i, n);
                     }
@@ -179,7 +186,8 @@ impl ProjectionPushDown {
                 Ok(Arc::new(LogicalPlan::TableScan {
                     schema_name: schema_name.to_string(),
                     table_name: table_name.to_string(),
-                    schema: Arc::new(projected_schema),
+                    table_schema: table_schema.clone(),
+                    projected_schema: Arc::new(projected_schema),
                     projection: Some(projection),
                 }))
             }
@@ -381,8 +389,11 @@ mod tests {
         // check that table scan schema now contains 2 columns
         match optimized_plan.as_ref().borrow() {
             LogicalPlan::Projection { input, .. } => match 
input.as_ref().borrow() {
-                LogicalPlan::TableScan { ref schema, .. } => {
-                    assert_eq!(2, schema.fields().len());
+                LogicalPlan::TableScan {
+                    ref projected_schema,
+                    ..
+                } => {
+                    assert_eq!(2, projected_schema.fields().len());
                 }
                 _ => assert!(false),
             },
@@ -403,14 +414,16 @@ mod tests {
 
     /// all tests share a common table
     fn test_table_scan() -> LogicalPlan {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::UInt32, false),
+            Field::new("b", DataType::UInt32, false),
+            Field::new("c", DataType::UInt32, false),
+        ]));
         TableScan {
             schema_name: "default".to_string(),
             table_name: "test".to_string(),
-            schema: Arc::new(Schema::new(vec![
-                Field::new("a", DataType::UInt32, false),
-                Field::new("b", DataType::UInt32, false),
-                Field::new("c", DataType::UInt32, false),
-            ])),
+            table_schema: schema.clone(),
+            projected_schema: schema,
             projection: None,
         }
     }
diff --git a/rust/datafusion/src/sql/parser.rs 
b/rust/datafusion/src/sql/parser.rs
index c86fbdf..4b8bf0f 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -162,8 +162,6 @@ impl DFParser {
                         }
                     }
 
-                    //println!("Parsed {} column defs", columns.len());
-
                     let mut headers = true;
                     let file_type: FileType = if self
                         .parser
diff --git a/rust/datafusion/src/sql/planner.rs 
b/rust/datafusion/src/sql/planner.rs
index 71ad507..5c94b3a 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -108,7 +108,6 @@ impl SqlToRel {
                             .collect::<Result<Vec<Expr>>>()?,
                         None => vec![],
                     };
-                    //println!("GROUP BY: {:?}", group_expr);
 
                     let mut all_fields: Vec<Expr> = group_expr.clone();
                     aggr_expr.iter().for_each(|x| all_fields.push(x.clone()));
@@ -196,7 +195,8 @@ impl SqlToRel {
                     Some(schema) => Ok(Arc::new(LogicalPlan::TableScan {
                         schema_name: String::from("default"),
                         table_name: id.clone(),
-                        schema: schema.clone(),
+                        table_schema: schema.clone(),
+                        projected_schema: schema.clone(),
                         projection: None,
                     })),
                     None => Err(ExecutionError::General(format!(
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 34a3b0f..cac6fa9 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -30,10 +30,62 @@ use datafusion::datasource::parquet::ParquetTable;
 use datafusion::datasource::TableProvider;
 use datafusion::execution::context::ExecutionContext;
 use datafusion::execution::relation::Relation;
+use datafusion::logicalplan::LogicalPlan;
 
 const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
 
 #[test]
+fn nyc() {
+    // schema for nyxtaxi csv files
+    let schema = Schema::new(vec![
+        Field::new("VendorID", DataType::Utf8, true),
+        Field::new("tpep_pickup_datetime", DataType::Utf8, true),
+        Field::new("tpep_dropoff_datetime", DataType::Utf8, true),
+        Field::new("passenger_count", DataType::Utf8, true),
+        Field::new("trip_distance", DataType::Float64, true),
+        Field::new("RatecodeID", DataType::Utf8, true),
+        Field::new("store_and_fwd_flag", DataType::Utf8, true),
+        Field::new("PULocationID", DataType::Utf8, true),
+        Field::new("DOLocationID", DataType::Utf8, true),
+        Field::new("payment_type", DataType::Utf8, true),
+        Field::new("fare_amount", DataType::Float64, true),
+        Field::new("extra", DataType::Float64, true),
+        Field::new("mta_tax", DataType::Float64, true),
+        Field::new("tip_amount", DataType::Float64, true),
+        Field::new("tolls_amount", DataType::Float64, true),
+        Field::new("improvement_surcharge", DataType::Float64, true),
+        Field::new("total_amount", DataType::Float64, true),
+    ]);
+
+    let mut ctx = ExecutionContext::new();
+    ctx.register_csv("tripdata", "file.csv", &schema, true);
+
+    let optimized_plan = ctx
+        .create_logical_plan(
+            "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) \
+             FROM tripdata GROUP BY passenger_count",
+        )
+        .unwrap();
+
+    println!("Logical plan: {:?}", optimized_plan);
+
+    match optimized_plan.as_ref() {
+        LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
+            LogicalPlan::TableScan {
+                ref projected_schema,
+                ..
+            } => {
+                assert_eq!(2, projected_schema.fields().len());
+                assert_eq!(projected_schema.field(0).name(), 
"passenger_count");
+                assert_eq!(projected_schema.field(1).name(), "fare_amount");
+            }
+            _ => assert!(false),
+        },
+        _ => assert!(false),
+    }
+}
+
+#[test]
 fn parquet_query() {
     let mut ctx = ExecutionContext::new();
     ctx.register_table(
diff --git a/testing b/testing
index d14764e..a674dac 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d14764eff71c51156bea2a7860f8df811d6c9f11
+Subproject commit a674dac190c5fc626964c9b611c67552fa2e530d

Reply via email to