This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ae1d24e ARROW-9839: [Rust] [DataFusion] Implement ExecutionPlan.as_any
ae1d24e is described below
commit ae1d24efcc3f1ac2a876d8d9f544a34eb04ae874
Author: Andy Grove <[email protected]>
AuthorDate: Sat Sep 26 14:27:31 2020 -0600
ARROW-9839: [Rust] [DataFusion] Implement ExecutionPlan.as_any
This allows ExecutionPlan to be downcast to a specific operator.
Closes #8284 from andygrove/ARROW-9839
Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
rust/datafusion/src/physical_plan/csv.rs | 6 ++++++
rust/datafusion/src/physical_plan/empty.rs | 6 ++++++
rust/datafusion/src/physical_plan/explain.rs | 9 ++++++++-
rust/datafusion/src/physical_plan/filter.rs | 6 ++++++
rust/datafusion/src/physical_plan/hash_aggregate.rs | 6 ++++++
rust/datafusion/src/physical_plan/limit.rs | 11 +++++++++++
rust/datafusion/src/physical_plan/memory.rs | 6 ++++++
rust/datafusion/src/physical_plan/merge.rs | 6 ++++++
rust/datafusion/src/physical_plan/mod.rs | 4 ++++
rust/datafusion/src/physical_plan/parquet.rs | 6 ++++++
rust/datafusion/src/physical_plan/planner.rs | 5 +++++
rust/datafusion/src/physical_plan/projection.rs | 6 ++++++
rust/datafusion/src/physical_plan/sort.rs | 6 ++++++
rust/datafusion/tests/user_defined_plan.rs | 5 +++++
14 files changed, 87 insertions(+), 1 deletion(-)
diff --git a/rust/datafusion/src/physical_plan/csv.rs
b/rust/datafusion/src/physical_plan/csv.rs
index 40ca89b..2cf65cc 100644
--- a/rust/datafusion/src/physical_plan/csv.rs
+++ b/rust/datafusion/src/physical_plan/csv.rs
@@ -17,6 +17,7 @@
//! Execution plan for reading CSV files
+use std::any::Any;
use std::fs::File;
use std::sync::{Arc, Mutex};
@@ -179,6 +180,11 @@ impl CsvExec {
}
impl ExecutionPlan for CsvExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
diff --git a/rust/datafusion/src/physical_plan/empty.rs
b/rust/datafusion/src/physical_plan/empty.rs
index 1f75ad5..0e364ab 100644
--- a/rust/datafusion/src/physical_plan/empty.rs
+++ b/rust/datafusion/src/physical_plan/empty.rs
@@ -17,6 +17,7 @@
//! EmptyRelation execution plan
+use std::any::Any;
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
@@ -39,6 +40,11 @@ impl EmptyExec {
}
impl ExecutionPlan for EmptyExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
diff --git a/rust/datafusion/src/physical_plan/explain.rs
b/rust/datafusion/src/physical_plan/explain.rs
index 7412b6c..7636860 100644
--- a/rust/datafusion/src/physical_plan/explain.rs
+++ b/rust/datafusion/src/physical_plan/explain.rs
@@ -17,6 +17,9 @@
//! Defines the EXPLAIN operator
+use std::any::Any;
+use std::sync::{Arc, Mutex};
+
use crate::error::{ExecutionError, Result};
use crate::{
logical_plan::StringifiedPlan,
@@ -29,7 +32,6 @@ use arrow::{
};
use crate::physical_plan::Partitioning;
-use std::sync::{Arc, Mutex};
/// Explain execution plan operator. This operator contains the string
/// values of the various plans it has when it is created, and passes
@@ -54,6 +56,11 @@ impl ExplainExec {
}
impl ExecutionPlan for ExplainExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
diff --git a/rust/datafusion/src/physical_plan/filter.rs
b/rust/datafusion/src/physical_plan/filter.rs
index e2de192..9cb199a 100644
--- a/rust/datafusion/src/physical_plan/filter.rs
+++ b/rust/datafusion/src/physical_plan/filter.rs
@@ -18,6 +18,7 @@
//! FilterExec evaluates a boolean predicate against all input batches to
determine which rows to
//! include in its output batches.
+use std::any::Any;
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
@@ -58,6 +59,11 @@ impl FilterExec {
}
impl ExecutionPlan for FilterExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
// The filter operator does not make any changes to the schema of its
input
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs
b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index e7e2017..eda508b 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -17,6 +17,7 @@
//! Defines the execution plan for the hash aggregate operation
+use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
@@ -116,6 +117,11 @@ impl HashAggregateExec {
}
impl ExecutionPlan for HashAggregateExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
diff --git a/rust/datafusion/src/physical_plan/limit.rs
b/rust/datafusion/src/physical_plan/limit.rs
index 113676b..85582e8 100644
--- a/rust/datafusion/src/physical_plan/limit.rs
+++ b/rust/datafusion/src/physical_plan/limit.rs
@@ -17,6 +17,7 @@
//! Defines the LIMIT plan
+use std::any::Any;
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
@@ -50,6 +51,11 @@ impl GlobalLimitExec {
}
impl ExecutionPlan for GlobalLimitExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.input.schema()
}
@@ -126,6 +132,11 @@ impl LocalLimitExec {
}
impl ExecutionPlan for LocalLimitExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.input.schema()
}
diff --git a/rust/datafusion/src/physical_plan/memory.rs
b/rust/datafusion/src/physical_plan/memory.rs
index ed2c22f..13447a4 100644
--- a/rust/datafusion/src/physical_plan/memory.rs
+++ b/rust/datafusion/src/physical_plan/memory.rs
@@ -17,6 +17,7 @@
//! Execution plan for reading in-memory batches of data
+use std::any::Any;
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
@@ -37,6 +38,11 @@ pub struct MemoryExec {
}
impl ExecutionPlan for MemoryExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.schema.clone()
diff --git a/rust/datafusion/src/physical_plan/merge.rs
b/rust/datafusion/src/physical_plan/merge.rs
index c12b18e..89dc7f0 100644
--- a/rust/datafusion/src/physical_plan/merge.rs
+++ b/rust/datafusion/src/physical_plan/merge.rs
@@ -18,6 +18,7 @@
//! Defines the merge plan for executing partitions in parallel and then
merging the results
//! into a single partition
+use std::any::Any;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
@@ -50,6 +51,11 @@ impl MergeExec {
}
impl ExecutionPlan for MergeExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.input.schema()
}
diff --git a/rust/datafusion/src/physical_plan/mod.rs
b/rust/datafusion/src/physical_plan/mod.rs
index 342f2e5..bdd9716 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -17,6 +17,7 @@
//! Traits for physical query plan, supporting parallel execution for
partitioned relations.
+use std::any::Any;
use std::cell::RefCell;
use std::fmt::{Debug, Display};
use std::rc::Rc;
@@ -42,6 +43,9 @@ pub trait PhysicalPlanner {
/// Partition-aware execution plan for a relation
pub trait ExecutionPlan: Debug + Send + Sync {
+ /// Returns the execution plan as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
/// Specifies the output partitioning scheme of this plan
diff --git a/rust/datafusion/src/physical_plan/parquet.rs
b/rust/datafusion/src/physical_plan/parquet.rs
index 318a819..3d74e52 100644
--- a/rust/datafusion/src/physical_plan/parquet.rs
+++ b/rust/datafusion/src/physical_plan/parquet.rs
@@ -17,6 +17,7 @@
//! Execution plan for reading Parquet files
+use std::any::Any;
use std::fs::File;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
@@ -87,6 +88,11 @@ impl ParquetExec {
}
impl ExecutionPlan for ParquetExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
diff --git a/rust/datafusion/src/physical_plan/planner.rs
b/rust/datafusion/src/physical_plan/planner.rs
index 8f8e00e..2fa0e2f 100644
--- a/rust/datafusion/src/physical_plan/planner.rs
+++ b/rust/datafusion/src/physical_plan/planner.rs
@@ -774,6 +774,11 @@ mod tests {
}
impl ExecutionPlan for NoOpExecutionPlan {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
diff --git a/rust/datafusion/src/physical_plan/projection.rs
b/rust/datafusion/src/physical_plan/projection.rs
index ea60572..d686b0f 100644
--- a/rust/datafusion/src/physical_plan/projection.rs
+++ b/rust/datafusion/src/physical_plan/projection.rs
@@ -20,6 +20,7 @@
//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b`
are the
//! projection expressions.
+use std::any::Any;
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
@@ -69,6 +70,11 @@ impl ProjectionExec {
}
impl ExecutionPlan for ProjectionExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.schema.clone()
diff --git a/rust/datafusion/src/physical_plan/sort.rs
b/rust/datafusion/src/physical_plan/sort.rs
index c58347a..8f5e534 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -17,6 +17,7 @@
//! Defines the SORT plan
+use std::any::Any;
use std::sync::{Arc, Mutex};
use arrow::array::ArrayRef;
@@ -57,6 +58,11 @@ impl SortExec {
}
impl ExecutionPlan for SortExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.input.schema().clone()
}
diff --git a/rust/datafusion/tests/user_defined_plan.rs
b/rust/datafusion/tests/user_defined_plan.rs
index 676e857..c956d58 100644
--- a/rust/datafusion/tests/user_defined_plan.rs
+++ b/rust/datafusion/tests/user_defined_plan.rs
@@ -355,6 +355,11 @@ impl Debug for TopKExec {
}
impl ExecutionPlan for TopKExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
fn schema(&self) -> SchemaRef {
self.input.schema()
}