This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ae1d24e  ARROW-9839: [Rust] [DataFusion] Implement ExecutionPlan.as_any
ae1d24e is described below

commit ae1d24efcc3f1ac2a876d8d9f544a34eb04ae874
Author: Andy Grove <[email protected]>
AuthorDate: Sat Sep 26 14:27:31 2020 -0600

    ARROW-9839: [Rust] [DataFusion] Implement ExecutionPlan.as_any
    
    This allows ExecutionPlan to be downcast to a specific operator.
    
    Closes #8284 from andygrove/ARROW-9839
    
    Authored-by: Andy Grove <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/datafusion/src/physical_plan/csv.rs            |  6 ++++++
 rust/datafusion/src/physical_plan/empty.rs          |  6 ++++++
 rust/datafusion/src/physical_plan/explain.rs        |  9 ++++++++-
 rust/datafusion/src/physical_plan/filter.rs         |  6 ++++++
 rust/datafusion/src/physical_plan/hash_aggregate.rs |  6 ++++++
 rust/datafusion/src/physical_plan/limit.rs          | 11 +++++++++++
 rust/datafusion/src/physical_plan/memory.rs         |  6 ++++++
 rust/datafusion/src/physical_plan/merge.rs          |  6 ++++++
 rust/datafusion/src/physical_plan/mod.rs            |  4 ++++
 rust/datafusion/src/physical_plan/parquet.rs        |  6 ++++++
 rust/datafusion/src/physical_plan/planner.rs        |  5 +++++
 rust/datafusion/src/physical_plan/projection.rs     |  6 ++++++
 rust/datafusion/src/physical_plan/sort.rs           |  6 ++++++
 rust/datafusion/tests/user_defined_plan.rs          |  5 +++++
 14 files changed, 87 insertions(+), 1 deletion(-)

diff --git a/rust/datafusion/src/physical_plan/csv.rs 
b/rust/datafusion/src/physical_plan/csv.rs
index 40ca89b..2cf65cc 100644
--- a/rust/datafusion/src/physical_plan/csv.rs
+++ b/rust/datafusion/src/physical_plan/csv.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading CSV files
 
+use std::any::Any;
 use std::fs::File;
 use std::sync::{Arc, Mutex};
 
@@ -179,6 +180,11 @@ impl CsvExec {
 }
 
 impl ExecutionPlan for CsvExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef {
         self.projected_schema.clone()
diff --git a/rust/datafusion/src/physical_plan/empty.rs 
b/rust/datafusion/src/physical_plan/empty.rs
index 1f75ad5..0e364ab 100644
--- a/rust/datafusion/src/physical_plan/empty.rs
+++ b/rust/datafusion/src/physical_plan/empty.rs
@@ -17,6 +17,7 @@
 
 //! EmptyRelation execution plan
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
@@ -39,6 +40,11 @@ impl EmptyExec {
 }
 
 impl ExecutionPlan for EmptyExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
diff --git a/rust/datafusion/src/physical_plan/explain.rs 
b/rust/datafusion/src/physical_plan/explain.rs
index 7412b6c..7636860 100644
--- a/rust/datafusion/src/physical_plan/explain.rs
+++ b/rust/datafusion/src/physical_plan/explain.rs
@@ -17,6 +17,9 @@
 
 //! Defines the EXPLAIN operator
 
+use std::any::Any;
+use std::sync::{Arc, Mutex};
+
 use crate::error::{ExecutionError, Result};
 use crate::{
     logical_plan::StringifiedPlan,
@@ -29,7 +32,6 @@ use arrow::{
 };
 
 use crate::physical_plan::Partitioning;
-use std::sync::{Arc, Mutex};
 
 /// Explain execution plan operator. This operator contains the string
 /// values of the various plans it has when it is created, and passes
@@ -54,6 +56,11 @@ impl ExplainExec {
 }
 
 impl ExecutionPlan for ExplainExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
diff --git a/rust/datafusion/src/physical_plan/filter.rs 
b/rust/datafusion/src/physical_plan/filter.rs
index e2de192..9cb199a 100644
--- a/rust/datafusion/src/physical_plan/filter.rs
+++ b/rust/datafusion/src/physical_plan/filter.rs
@@ -18,6 +18,7 @@
 //! FilterExec evaluates a boolean predicate against all input batches to 
determine which rows to
 //! include in its output batches.
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
@@ -58,6 +59,11 @@ impl FilterExec {
 }
 
 impl ExecutionPlan for FilterExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef {
         // The filter operator does not make any changes to the schema of its 
input
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs 
b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index e7e2017..eda508b 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -17,6 +17,7 @@
 
 //! Defines the execution plan for the hash aggregate operation
 
+use std::any::Any;
 use std::cell::RefCell;
 use std::rc::Rc;
 use std::sync::{Arc, Mutex};
@@ -116,6 +117,11 @@ impl HashAggregateExec {
 }
 
 impl ExecutionPlan for HashAggregateExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
diff --git a/rust/datafusion/src/physical_plan/limit.rs 
b/rust/datafusion/src/physical_plan/limit.rs
index 113676b..85582e8 100644
--- a/rust/datafusion/src/physical_plan/limit.rs
+++ b/rust/datafusion/src/physical_plan/limit.rs
@@ -17,6 +17,7 @@
 
 //! Defines the LIMIT plan
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
@@ -50,6 +51,11 @@ impl GlobalLimitExec {
 }
 
 impl ExecutionPlan for GlobalLimitExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.input.schema()
     }
@@ -126,6 +132,11 @@ impl LocalLimitExec {
 }
 
 impl ExecutionPlan for LocalLimitExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.input.schema()
     }
diff --git a/rust/datafusion/src/physical_plan/memory.rs 
b/rust/datafusion/src/physical_plan/memory.rs
index ed2c22f..13447a4 100644
--- a/rust/datafusion/src/physical_plan/memory.rs
+++ b/rust/datafusion/src/physical_plan/memory.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading in-memory batches of data
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
@@ -37,6 +38,11 @@ pub struct MemoryExec {
 }
 
 impl ExecutionPlan for MemoryExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
diff --git a/rust/datafusion/src/physical_plan/merge.rs 
b/rust/datafusion/src/physical_plan/merge.rs
index c12b18e..89dc7f0 100644
--- a/rust/datafusion/src/physical_plan/merge.rs
+++ b/rust/datafusion/src/physical_plan/merge.rs
@@ -18,6 +18,7 @@
 //! Defines the merge plan for executing partitions in parallel and then 
merging the results
 //! into a single partition
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 use std::thread::{self, JoinHandle};
 
@@ -50,6 +51,11 @@ impl MergeExec {
 }
 
 impl ExecutionPlan for MergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.input.schema()
     }
diff --git a/rust/datafusion/src/physical_plan/mod.rs 
b/rust/datafusion/src/physical_plan/mod.rs
index 342f2e5..bdd9716 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -17,6 +17,7 @@
 
 //! Traits for physical query plan, supporting parallel execution for 
partitioned relations.
 
+use std::any::Any;
 use std::cell::RefCell;
 use std::fmt::{Debug, Display};
 use std::rc::Rc;
@@ -42,6 +43,9 @@ pub trait PhysicalPlanner {
 
 /// Partition-aware execution plan for a relation
 pub trait ExecutionPlan: Debug + Send + Sync {
+    /// Returns the execution plan as [`Any`](std::any::Any) so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef;
     /// Specifies the output partitioning scheme of this plan
diff --git a/rust/datafusion/src/physical_plan/parquet.rs 
b/rust/datafusion/src/physical_plan/parquet.rs
index 318a819..3d74e52 100644
--- a/rust/datafusion/src/physical_plan/parquet.rs
+++ b/rust/datafusion/src/physical_plan/parquet.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading Parquet files
 
+use std::any::Any;
 use std::fs::File;
 use std::rc::Rc;
 use std::sync::{Arc, Mutex};
@@ -87,6 +88,11 @@ impl ParquetExec {
 }
 
 impl ExecutionPlan for ParquetExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
diff --git a/rust/datafusion/src/physical_plan/planner.rs 
b/rust/datafusion/src/physical_plan/planner.rs
index 8f8e00e..2fa0e2f 100644
--- a/rust/datafusion/src/physical_plan/planner.rs
+++ b/rust/datafusion/src/physical_plan/planner.rs
@@ -774,6 +774,11 @@ mod tests {
     }
 
     impl ExecutionPlan for NoOpExecutionPlan {
+        /// Return a reference to Any that can be used for downcasting
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
         fn schema(&self) -> SchemaRef {
             self.schema.clone()
         }
diff --git a/rust/datafusion/src/physical_plan/projection.rs 
b/rust/datafusion/src/physical_plan/projection.rs
index ea60572..d686b0f 100644
--- a/rust/datafusion/src/physical_plan/projection.rs
+++ b/rust/datafusion/src/physical_plan/projection.rs
@@ -20,6 +20,7 @@
 //! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` 
are the
 //! projection expressions.
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
@@ -69,6 +70,11 @@ impl ProjectionExec {
 }
 
 impl ExecutionPlan for ProjectionExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
diff --git a/rust/datafusion/src/physical_plan/sort.rs 
b/rust/datafusion/src/physical_plan/sort.rs
index c58347a..8f5e534 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -17,6 +17,7 @@
 
 //! Defines the SORT plan
 
+use std::any::Any;
 use std::sync::{Arc, Mutex};
 
 use arrow::array::ArrayRef;
@@ -57,6 +58,11 @@ impl SortExec {
 }
 
 impl ExecutionPlan for SortExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.input.schema().clone()
     }
diff --git a/rust/datafusion/tests/user_defined_plan.rs 
b/rust/datafusion/tests/user_defined_plan.rs
index 676e857..c956d58 100644
--- a/rust/datafusion/tests/user_defined_plan.rs
+++ b/rust/datafusion/tests/user_defined_plan.rs
@@ -355,6 +355,11 @@ impl Debug for TopKExec {
 }
 
 impl ExecutionPlan for TopKExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     fn schema(&self) -> SchemaRef {
         self.input.schema()
     }

Reply via email to