This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 89e96b404f Derive `Clone` for more ExecutionPlans (#13203)
89e96b404f is described below

commit 89e96b404f07900469fee31740f43edd8a410a10
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Nov 2 06:24:37 2024 -0400

    Derive `Clone` for more ExecutionPlans (#13203)
    
    * Derive `Clone` for more ExecutionPlans
    
    * improve docs
---
 datafusion/physical-plan/src/aggregates/mod.rs                  | 2 +-
 datafusion/physical-plan/src/coalesce_batches.rs                | 2 +-
 datafusion/physical-plan/src/coalesce_partitions.rs             | 2 +-
 datafusion/physical-plan/src/empty.rs                           | 2 +-
 datafusion/physical-plan/src/filter.rs                          | 2 +-
 datafusion/physical-plan/src/insert.rs                          | 1 +
 datafusion/physical-plan/src/joins/cross_join.rs                | 5 +++++
 datafusion/physical-plan/src/joins/hash_join.rs                 | 5 +++++
 datafusion/physical-plan/src/joins/nested_loop_join.rs          | 4 ++++
 datafusion/physical-plan/src/joins/sort_merge_join.rs           | 2 +-
 datafusion/physical-plan/src/joins/symmetric_hash_join.rs       | 2 +-
 datafusion/physical-plan/src/limit.rs                           | 2 +-
 datafusion/physical-plan/src/memory.rs                          | 1 +
 datafusion/physical-plan/src/placeholder_row.rs                 | 2 +-
 datafusion/physical-plan/src/recursive_query.rs                 | 2 +-
 datafusion/physical-plan/src/repartition/mod.rs                 | 2 +-
 datafusion/physical-plan/src/sorts/sort.rs                      | 2 +-
 datafusion/physical-plan/src/sorts/sort_preserving_merge.rs     | 2 +-
 datafusion/physical-plan/src/streaming.rs                       | 1 +
 datafusion/physical-plan/src/union.rs                           | 4 ++--
 datafusion/physical-plan/src/unnest.rs                          | 2 +-
 datafusion/physical-plan/src/values.rs                          | 2 +-
 datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs | 2 +-
 datafusion/physical-plan/src/windows/window_agg_exec.rs         | 2 +-
 24 files changed, 36 insertions(+), 19 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 4193cc187e..5ffe797c5c 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -344,7 +344,7 @@ impl From<StreamType> for SendableRecordBatchStream {
 }
 
 /// Hash aggregate execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct AggregateExec {
     /// Aggregation mode (full, partial)
     mode: AggregateMode,
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs 
b/datafusion/physical-plan/src/coalesce_batches.rs
index 61fb3599f0..11678e7a46 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -48,7 +48,7 @@ use futures::stream::{Stream, StreamExt};
 /// reaches the `fetch` value.
 ///
 /// See [`BatchCoalescer`] for more information
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct CoalesceBatchesExec {
     /// The input plan
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs 
b/datafusion/physical-plan/src/coalesce_partitions.rs
index f9d4ec6a1a..3da101d609 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -36,7 +36,7 @@ use datafusion_execution::TaskContext;
 
 /// Merge execution plan executes partitions in parallel and combines them 
into a single
 /// partition. No guarantees are made about the order of the resulting 
partition.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct CoalescePartitionsExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/empty.rs 
b/datafusion/physical-plan/src/empty.rs
index f6e0abb94f..192619f69f 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -35,7 +35,7 @@ use datafusion_physical_expr::EquivalenceProperties;
 use log::trace;
 
 /// Execution plan for empty relation with produce_one_row=false
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct EmptyExec {
     /// The schema for the produced row
     schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 30b0af19f4..97d8159137 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -54,7 +54,7 @@ use log::trace;
 
 /// FilterExec evaluates a boolean predicate against all input batches to 
determine which rows to
 /// include in its output batches.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct FilterExec {
     /// The expression to filter on. This expression must evaluate to a 
boolean value.
     predicate: Arc<dyn PhysicalExpr>,
diff --git a/datafusion/physical-plan/src/insert.rs 
b/datafusion/physical-plan/src/insert.rs
index 8b3ef5ae01..e478cecb7f 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -79,6 +79,7 @@ pub type FileSinkExec = DataSinkExec;
 /// Execution plan for writing record batches to a [`DataSink`]
 ///
 /// Returns a single row with the number of values written
+#[derive(Clone)]
 pub struct DataSinkExec {
     /// Input plan that produces the record batches to be written.
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index 8f49885068..a67e1df47b 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -49,8 +49,13 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
 /// Data of the left side
 type JoinLeftData = (RecordBatch, MemoryReservation);
 
+#[allow(rustdoc::private_intra_doc_links)]
 /// executes partitions in parallel and combines them into a set of
 /// partitions by combining all values from the left with all values on the 
right
+///
+/// Note that the `Clone` trait is not implemented for this struct due to the
+/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
+/// left side with the processing in each output stream.
 #[derive(Debug)]
 pub struct CrossJoinExec {
     /// left (build) side which gets loaded in memory
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index c56c179c17..57d8a9ce7b 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -136,6 +136,7 @@ impl JoinLeftData {
     }
 }
 
+#[allow(rustdoc::private_intra_doc_links)]
 /// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
 /// partitions using a hash table and an optional filter list to apply post
 /// join.
@@ -293,6 +294,10 @@ impl JoinLeftData {
 ///                       │  "dimension"  │     │    "fact"     │
 ///                       └───────────────┘     └───────────────┘
 /// ```
+///
+/// Note that the `Clone` trait is not implemented for this struct due to the
+/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
+/// left side with the processing in each output stream.
 #[derive(Debug)]
 pub struct HashJoinExec {
     /// left (build) side which gets hashed
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index a87743565a..f36c2395e2 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -105,6 +105,7 @@ impl JoinLeftData {
     }
 }
 
+#[allow(rustdoc::private_intra_doc_links)]
 /// NestedLoopJoinExec is build-probe join operator, whose main task is to
 /// perform joins without any equijoin conditions in `ON` clause.
 ///
@@ -140,6 +141,9 @@ impl JoinLeftData {
 /// "reports" about probe phase completion (which means that "visited" bitmap 
won't be
 /// updated anymore), and only the last thread, reporting about completion, 
will return output.
 ///
+/// Note that the `Clone` trait is not implemented for this struct due to the
+/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
+/// left side with the processing in each output stream.
 #[derive(Debug)]
 pub struct NestedLoopJoinExec {
     /// left side
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 90dc407fca..3ad892c880 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -71,7 +71,7 @@ use crate::{
 
 /// join execution plan executes partitions in parallel and combines them into 
a set of
 /// partitions.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct SortMergeJoinExec {
     /// Left sorted joining execution plan
     pub left: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 81c13c6525..5b6dc2cd2a 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -167,7 +167,7 @@ const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
 /// making the smallest value in 'left_sorted' 1231 and any rows below (since 
ascending)
 /// than that can be dropped from the inner buffer.
 /// ```
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct SymmetricHashJoinExec {
     /// Left side stream
     pub(crate) left: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/limit.rs 
b/datafusion/physical-plan/src/limit.rs
index 1fe550a930..ab1e6cb37b 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -39,7 +39,7 @@ use futures::stream::{Stream, StreamExt};
 use log::trace;
 
 /// Limit execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct GlobalLimitExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index 56ed144845..c9ada345af 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -40,6 +40,7 @@ use datafusion_physical_expr::{EquivalenceProperties, 
LexOrdering};
 use futures::Stream;
 
 /// Execution plan for reading in-memory batches of data
+#[derive(Clone)]
 pub struct MemoryExec {
     /// The partitions to query
     partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/physical-plan/src/placeholder_row.rs 
b/datafusion/physical-plan/src/placeholder_row.rs
index 5d8ca7e769..f9437f46f8 100644
--- a/datafusion/physical-plan/src/placeholder_row.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -37,7 +37,7 @@ use datafusion_physical_expr::EquivalenceProperties;
 use log::trace;
 
 /// Execution plan for empty relation with produce_one_row=true
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct PlaceholderRowExec {
     /// The schema for the produced row
     schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/recursive_query.rs 
b/datafusion/physical-plan/src/recursive_query.rs
index e9ea9d4f50..cbf22a4b39 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -53,7 +53,7 @@ use futures::{ready, Stream, StreamExt};
 /// Note that there won't be any limit or checks applied to detect
 /// an infinite recursion, so it is up to the planner to ensure that
 /// it won't happen.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct RecursiveQueryExec {
     /// Name of the query handler
     name: String,
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 06144f98c8..bc65b25156 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -399,7 +399,7 @@ impl BatchPartitioner {
 /// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf)
 /// which uses the term "Exchange" for the concept of repartitioning
 /// data across threads.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct RepartitionExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 32d6d3e007..d90d0f64ce 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -675,7 +675,7 @@ pub(crate) fn lexsort_to_indices_multi_columns(
 ///
 /// Support sorting datasets that are larger than the memory allotted
 /// by the memory manager, by spilling to disk.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct SortExec {
     /// Input schema
     pub(crate) input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index ae39cfe412..9ee0faaa0a 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -71,7 +71,7 @@ use log::{debug, trace};
 ///
 /// If any of the input partitions return an error, the error is propagated to
 /// the output and inputs are not polled again.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct SortPreservingMergeExec {
     /// Input plan
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/streaming.rs 
b/datafusion/physical-plan/src/streaming.rs
index cdb94af1fe..7ccef32480 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -55,6 +55,7 @@ pub trait PartitionStream: Debug + Send + Sync {
 ///
 /// If your source can be represented as one or more [`PartitionStream`]s, you 
can
 /// use this struct to implement [`ExecutionPlan`].
+#[derive(Clone)]
 pub struct StreamingTableExec {
     partitions: Vec<Arc<dyn PartitionStream>>,
     projection: Option<Arc<[usize]>>,
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 69cc63f8f6..bd36753880 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -85,7 +85,7 @@ use tokio::macros::support::thread_rng_n;
 ///                  │Input 1          │  │Input 2           │
 ///                  └─────────────────┘  └──────────────────┘
 /// ```
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct UnionExec {
     /// Input execution plan
     inputs: Vec<Arc<dyn ExecutionPlan>>,
@@ -298,7 +298,7 @@ impl ExecutionPlan for UnionExec {
 /// |         |-----------------+
 /// +---------+
 /// ```
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct InterleaveExec {
     /// Input execution plan
     inputs: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index 3e312b7451..b7b9f17eb1 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -56,7 +56,7 @@ use log::trace;
 /// Thus the original RecordBatch with dimension (n x m) may have new 
dimension (n' x m')
 ///
 /// See [`UnnestOptions`] for more details and an example.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct UnnestExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/values.rs 
b/datafusion/physical-plan/src/values.rs
index 991146d245..edadf98cb1 100644
--- a/datafusion/physical-plan/src/values.rs
+++ b/datafusion/physical-plan/src/values.rs
@@ -36,7 +36,7 @@ use datafusion_execution::TaskContext;
 use datafusion_physical_expr::EquivalenceProperties;
 
 /// Execution plan for values list based relation (produces constant rows)
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ValuesExec {
     /// The schema
     schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 2c60be49a4..8c0331f945 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -67,7 +67,7 @@ use indexmap::IndexMap;
 use log::debug;
 
 /// Window execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct BoundedWindowAggExec {
     /// Input plan
     input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index 1318f36f26..f71a0b9fd0 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -46,7 +46,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
LexRequirement};
 use futures::{ready, Stream, StreamExt};
 
 /// Window execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct WindowAggExec {
     /// Input plan
     pub(crate) input: Arc<dyn ExecutionPlan>,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to