This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 217f8c8 ARROW-5946: [Rust] [DataFusion] Fix bug in projection push
down logic
217f8c8 is described below
commit 217f8c86ddf394aacf61bce7ccd45d875f86b964
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jul 15 07:47:46 2019 -0600
ARROW-5946: [Rust] [DataFusion] Fix bug in projection push down logic
There was a dumb bug when deriving the projected schema. The following code
was used:
```
for i in in 0..projection.len()
```
This should have been
```
for i in in 0..&projection
```
This PR modifies the `TableScan` variant of `LogicalPlan` to store both the
underlying table schema, and the projected schema. I added a unit test to
demonstrate that the bug is now fixed.
Author: Andy Grove <[email protected]>
Closes #4878 from andygrove/ARROW-5946 and squashes the following commits:
8ac74c81d <Andy Grove> remove println
a992c1703 <Andy Grove> Fix bug in projection push down
---
rust/datafusion/src/execution/context.rs | 3 +-
rust/datafusion/src/logicalplan.rs | 13 ++++--
.../src/optimizer/projection_push_down.rs | 37 ++++++++++-----
rust/datafusion/src/sql/parser.rs | 2 -
rust/datafusion/src/sql/planner.rs | 4 +-
rust/datafusion/tests/sql.rs | 52 ++++++++++++++++++++++
testing | 2 +-
7 files changed, 91 insertions(+), 22 deletions(-)
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index 1a31b9a..064b3e4 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -172,7 +172,8 @@ impl ExecutionContext {
Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::TableScan {
schema_name: "".to_string(),
table_name: table_name.to_string(),
- schema: provider.schema().clone(),
+ table_schema: provider.schema().clone(),
+ projected_schema: provider.schema().clone(),
projection: None,
}))))
}
diff --git a/rust/datafusion/src/logicalplan.rs
b/rust/datafusion/src/logicalplan.rs
index ca49789..cc24149 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -423,8 +423,10 @@ pub enum LogicalPlan {
schema_name: String,
/// The name of the table
table_name: String,
- /// The schema description
- schema: Arc<Schema>,
+ /// The underlying table schema
+ table_schema: Arc<Schema>,
+ /// The projected schema
+ projected_schema: Arc<Schema>,
/// Optional column indices to use as a projection
projection: Option<Vec<usize>>,
},
@@ -462,7 +464,9 @@ impl LogicalPlan {
pub fn schema(&self) -> &Arc<Schema> {
match self {
LogicalPlan::EmptyRelation { schema } => &schema,
- LogicalPlan::TableScan { schema, .. } => &schema,
+ LogicalPlan::TableScan {
+ projected_schema, ..
+ } => &projected_schema,
LogicalPlan::Projection { schema, .. } => &schema,
LogicalPlan::Selection { input, .. } => input.schema(),
LogicalPlan::Aggregate { schema, .. } => &schema,
@@ -622,7 +626,8 @@ mod tests {
let plan = Arc::new(LogicalPlan::TableScan {
schema_name: "".to_string(),
table_name: "people".to_string(),
- schema: Arc::new(schema),
+ table_schema: Arc::new(schema.clone()),
+ projected_schema: Arc::new(schema),
projection: Some(vec![0, 1, 4]),
});
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs
b/rust/datafusion/src/optimizer/projection_push_down.rs
index 9f9534b..6bc35da 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -139,9 +139,16 @@ impl ProjectionPushDown {
LogicalPlan::TableScan {
schema_name,
table_name,
- schema,
+ table_schema,
+ projection,
..
} => {
+ if projection.is_some() {
+ return Err(ExecutionError::General(
+ "Cannot run projection push-down rule more than
once".to_string(),
+ ));
+ }
+
// once we reach the table scan, we can use the accumulated
set of column
// indexes as the projection in the table scan
let mut projection: Vec<usize> =
Vec::with_capacity(accum.len());
@@ -153,8 +160,8 @@ impl ProjectionPushDown {
// create the projected schema
let mut projected_fields: Vec<Field> =
Vec::with_capacity(projection.len());
- for i in 0..projection.len() {
- projected_fields.push(schema.fields()[i].clone());
+ for i in &projection {
+ projected_fields.push(table_schema.fields()[*i].clone());
}
let projected_schema = Schema::new(projected_fields);
@@ -169,7 +176,7 @@ impl ProjectionPushDown {
));
}
- for i in 0..schema.fields().len() {
+ for i in 0..table_schema.fields().len() {
if let Some(n) = projection.iter().position(|v| *v == i) {
mapping.insert(i, n);
}
@@ -179,7 +186,8 @@ impl ProjectionPushDown {
Ok(Arc::new(LogicalPlan::TableScan {
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
- schema: Arc::new(projected_schema),
+ table_schema: table_schema.clone(),
+ projected_schema: Arc::new(projected_schema),
projection: Some(projection),
}))
}
@@ -381,8 +389,11 @@ mod tests {
// check that table scan schema now contains 2 columns
match optimized_plan.as_ref().borrow() {
LogicalPlan::Projection { input, .. } => match
input.as_ref().borrow() {
- LogicalPlan::TableScan { ref schema, .. } => {
- assert_eq!(2, schema.fields().len());
+ LogicalPlan::TableScan {
+ ref projected_schema,
+ ..
+ } => {
+ assert_eq!(2, projected_schema.fields().len());
}
_ => assert!(false),
},
@@ -403,14 +414,16 @@ mod tests {
/// all tests share a common table
fn test_table_scan() -> LogicalPlan {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::UInt32, false),
+ Field::new("c", DataType::UInt32, false),
+ ]));
TableScan {
schema_name: "default".to_string(),
table_name: "test".to_string(),
- schema: Arc::new(Schema::new(vec![
- Field::new("a", DataType::UInt32, false),
- Field::new("b", DataType::UInt32, false),
- Field::new("c", DataType::UInt32, false),
- ])),
+ table_schema: schema.clone(),
+ projected_schema: schema,
projection: None,
}
}
diff --git a/rust/datafusion/src/sql/parser.rs
b/rust/datafusion/src/sql/parser.rs
index c86fbdf..4b8bf0f 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -162,8 +162,6 @@ impl DFParser {
}
}
- //println!("Parsed {} column defs", columns.len());
-
let mut headers = true;
let file_type: FileType = if self
.parser
diff --git a/rust/datafusion/src/sql/planner.rs
b/rust/datafusion/src/sql/planner.rs
index 71ad507..5c94b3a 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -108,7 +108,6 @@ impl SqlToRel {
.collect::<Result<Vec<Expr>>>()?,
None => vec![],
};
- //println!("GROUP BY: {:?}", group_expr);
let mut all_fields: Vec<Expr> = group_expr.clone();
aggr_expr.iter().for_each(|x| all_fields.push(x.clone()));
@@ -196,7 +195,8 @@ impl SqlToRel {
Some(schema) => Ok(Arc::new(LogicalPlan::TableScan {
schema_name: String::from("default"),
table_name: id.clone(),
- schema: schema.clone(),
+ table_schema: schema.clone(),
+ projected_schema: schema.clone(),
projection: None,
})),
None => Err(ExecutionError::General(format!(
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 34a3b0f..cac6fa9 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -30,10 +30,62 @@ use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::ExecutionContext;
use datafusion::execution::relation::Relation;
+use datafusion::logicalplan::LogicalPlan;
const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
#[test]
+fn nyc() {
+ // schema for nyxtaxi csv files
+ let schema = Schema::new(vec![
+ Field::new("VendorID", DataType::Utf8, true),
+ Field::new("tpep_pickup_datetime", DataType::Utf8, true),
+ Field::new("tpep_dropoff_datetime", DataType::Utf8, true),
+ Field::new("passenger_count", DataType::Utf8, true),
+ Field::new("trip_distance", DataType::Float64, true),
+ Field::new("RatecodeID", DataType::Utf8, true),
+ Field::new("store_and_fwd_flag", DataType::Utf8, true),
+ Field::new("PULocationID", DataType::Utf8, true),
+ Field::new("DOLocationID", DataType::Utf8, true),
+ Field::new("payment_type", DataType::Utf8, true),
+ Field::new("fare_amount", DataType::Float64, true),
+ Field::new("extra", DataType::Float64, true),
+ Field::new("mta_tax", DataType::Float64, true),
+ Field::new("tip_amount", DataType::Float64, true),
+ Field::new("tolls_amount", DataType::Float64, true),
+ Field::new("improvement_surcharge", DataType::Float64, true),
+ Field::new("total_amount", DataType::Float64, true),
+ ]);
+
+ let mut ctx = ExecutionContext::new();
+ ctx.register_csv("tripdata", "file.csv", &schema, true);
+
+ let optimized_plan = ctx
+ .create_logical_plan(
+ "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) \
+ FROM tripdata GROUP BY passenger_count",
+ )
+ .unwrap();
+
+ println!("Logical plan: {:?}", optimized_plan);
+
+ match optimized_plan.as_ref() {
+ LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
+ LogicalPlan::TableScan {
+ ref projected_schema,
+ ..
+ } => {
+ assert_eq!(2, projected_schema.fields().len());
+ assert_eq!(projected_schema.field(0).name(),
"passenger_count");
+ assert_eq!(projected_schema.field(1).name(), "fare_amount");
+ }
+ _ => assert!(false),
+ },
+ _ => assert!(false),
+ }
+}
+
+#[test]
fn parquet_query() {
let mut ctx = ExecutionContext::new();
ctx.register_table(
diff --git a/testing b/testing
index d14764e..a674dac 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d14764eff71c51156bea2a7860f8df811d6c9f11
+Subproject commit a674dac190c5fc626964c9b611c67552fa2e530d