This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 89e96b404f Derive `Clone` for more ExecutionPlans (#13203)
89e96b404f is described below
commit 89e96b404f07900469fee31740f43edd8a410a10
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Nov 2 06:24:37 2024 -0400
Derive `Clone` for more ExecutionPlans (#13203)
* Derive `Clone` for more ExecutionPlans
* improve docs
---
datafusion/physical-plan/src/aggregates/mod.rs | 2 +-
datafusion/physical-plan/src/coalesce_batches.rs | 2 +-
datafusion/physical-plan/src/coalesce_partitions.rs | 2 +-
datafusion/physical-plan/src/empty.rs | 2 +-
datafusion/physical-plan/src/filter.rs | 2 +-
datafusion/physical-plan/src/insert.rs | 1 +
datafusion/physical-plan/src/joins/cross_join.rs | 5 +++++
datafusion/physical-plan/src/joins/hash_join.rs | 5 +++++
datafusion/physical-plan/src/joins/nested_loop_join.rs | 4 ++++
datafusion/physical-plan/src/joins/sort_merge_join.rs | 2 +-
datafusion/physical-plan/src/joins/symmetric_hash_join.rs | 2 +-
datafusion/physical-plan/src/limit.rs | 2 +-
datafusion/physical-plan/src/memory.rs | 1 +
datafusion/physical-plan/src/placeholder_row.rs | 2 +-
datafusion/physical-plan/src/recursive_query.rs | 2 +-
datafusion/physical-plan/src/repartition/mod.rs | 2 +-
datafusion/physical-plan/src/sorts/sort.rs | 2 +-
datafusion/physical-plan/src/sorts/sort_preserving_merge.rs | 2 +-
datafusion/physical-plan/src/streaming.rs | 1 +
datafusion/physical-plan/src/union.rs | 4 ++--
datafusion/physical-plan/src/unnest.rs | 2 +-
datafusion/physical-plan/src/values.rs | 2 +-
datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs | 2 +-
datafusion/physical-plan/src/windows/window_agg_exec.rs | 2 +-
24 files changed, 36 insertions(+), 19 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 4193cc187e..5ffe797c5c 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -344,7 +344,7 @@ impl From<StreamType> for SendableRecordBatchStream {
}
/// Hash aggregate execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct AggregateExec {
/// Aggregation mode (full, partial)
mode: AggregateMode,
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs
b/datafusion/physical-plan/src/coalesce_batches.rs
index 61fb3599f0..11678e7a46 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -48,7 +48,7 @@ use futures::stream::{Stream, StreamExt};
/// reaches the `fetch` value.
///
/// See [`BatchCoalescer`] for more information
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct CoalesceBatchesExec {
/// The input plan
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index f9d4ec6a1a..3da101d609 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -36,7 +36,7 @@ use datafusion_execution::TaskContext;
/// Merge execution plan executes partitions in parallel and combines them
into a single
/// partition. No guarantees are made about the order of the resulting
partition.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct CoalescePartitionsExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/empty.rs
b/datafusion/physical-plan/src/empty.rs
index f6e0abb94f..192619f69f 100644
--- a/datafusion/physical-plan/src/empty.rs
+++ b/datafusion/physical-plan/src/empty.rs
@@ -35,7 +35,7 @@ use datafusion_physical_expr::EquivalenceProperties;
use log::trace;
/// Execution plan for empty relation with produce_one_row=false
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct EmptyExec {
/// The schema for the produced row
schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 30b0af19f4..97d8159137 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -54,7 +54,7 @@ use log::trace;
/// FilterExec evaluates a boolean predicate against all input batches to
determine which rows to
/// include in its output batches.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct FilterExec {
/// The expression to filter on. This expression must evaluate to a
boolean value.
predicate: Arc<dyn PhysicalExpr>,
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index 8b3ef5ae01..e478cecb7f 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -79,6 +79,7 @@ pub type FileSinkExec = DataSinkExec;
/// Execution plan for writing record batches to a [`DataSink`]
///
/// Returns a single row with the number of values written
+#[derive(Clone)]
pub struct DataSinkExec {
/// Input plan that produces the record batches to be written.
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index 8f49885068..a67e1df47b 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -49,8 +49,13 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
/// Data of the left side
type JoinLeftData = (RecordBatch, MemoryReservation);
+#[allow(rustdoc::private_intra_doc_links)]
/// executes partitions in parallel and combines them into a set of
/// partitions by combining all values from the left with all values on the
right
+///
+/// Note that the `Clone` trait is not implemented for this struct due to the
+/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
+/// left side with the processing in each output stream.
#[derive(Debug)]
pub struct CrossJoinExec {
/// left (build) side which gets loaded in memory
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index c56c179c17..57d8a9ce7b 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -136,6 +136,7 @@ impl JoinLeftData {
}
}
+#[allow(rustdoc::private_intra_doc_links)]
/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
/// partitions using a hash table and an optional filter list to apply post
/// join.
@@ -293,6 +294,10 @@ impl JoinLeftData {
/// │ "dimension" │ │ "fact" │
/// └───────────────┘ └───────────────┘
/// ```
+///
+/// Note that the `Clone` trait is not implemented for this struct due to the
+/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
+/// left side with the processing in each output stream.
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index a87743565a..f36c2395e2 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -105,6 +105,7 @@ impl JoinLeftData {
}
}
+#[allow(rustdoc::private_intra_doc_links)]
/// NestedLoopJoinExec is build-probe join operator, whose main task is to
/// perform joins without any equijoin conditions in `ON` clause.
///
@@ -140,6 +141,9 @@ impl JoinLeftData {
/// "reports" about probe phase completion (which means that "visited" bitmap
won't be
/// updated anymore), and only the last thread, reporting about completion,
will return output.
///
+/// Note that the `Clone` trait is not implemented for this struct due to the
+/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the
+/// left side with the processing in each output stream.
#[derive(Debug)]
pub struct NestedLoopJoinExec {
/// left side
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 90dc407fca..3ad892c880 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -71,7 +71,7 @@ use crate::{
/// join execution plan executes partitions in parallel and combines them into
a set of
/// partitions.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct SortMergeJoinExec {
/// Left sorted joining execution plan
pub left: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 81c13c6525..5b6dc2cd2a 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -167,7 +167,7 @@ const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
/// making the smallest value in 'left_sorted' 1231 and any rows below (since
ascending)
/// than that can be dropped from the inner buffer.
/// ```
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct SymmetricHashJoinExec {
/// Left side stream
pub(crate) left: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/limit.rs
b/datafusion/physical-plan/src/limit.rs
index 1fe550a930..ab1e6cb37b 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -39,7 +39,7 @@ use futures::stream::{Stream, StreamExt};
use log::trace;
/// Limit execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct GlobalLimitExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index 56ed144845..c9ada345af 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -40,6 +40,7 @@ use datafusion_physical_expr::{EquivalenceProperties,
LexOrdering};
use futures::Stream;
/// Execution plan for reading in-memory batches of data
+#[derive(Clone)]
pub struct MemoryExec {
/// The partitions to query
partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/physical-plan/src/placeholder_row.rs
b/datafusion/physical-plan/src/placeholder_row.rs
index 5d8ca7e769..f9437f46f8 100644
--- a/datafusion/physical-plan/src/placeholder_row.rs
+++ b/datafusion/physical-plan/src/placeholder_row.rs
@@ -37,7 +37,7 @@ use datafusion_physical_expr::EquivalenceProperties;
use log::trace;
/// Execution plan for empty relation with produce_one_row=true
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct PlaceholderRowExec {
/// The schema for the produced row
schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/recursive_query.rs
b/datafusion/physical-plan/src/recursive_query.rs
index e9ea9d4f50..cbf22a4b39 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -53,7 +53,7 @@ use futures::{ready, Stream, StreamExt};
/// Note that there won't be any limit or checks applied to detect
/// an infinite recursion, so it is up to the planner to ensure that
/// it won't happen.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RecursiveQueryExec {
/// Name of the query handler
name: String,
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 06144f98c8..bc65b25156 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -399,7 +399,7 @@ impl BatchPartitioner {
/// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf)
/// which uses the term "Exchange" for the concept of repartitioning
/// data across threads.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RepartitionExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 32d6d3e007..d90d0f64ce 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -675,7 +675,7 @@ pub(crate) fn lexsort_to_indices_multi_columns(
///
/// Support sorting datasets that are larger than the memory allotted
/// by the memory manager, by spilling to disk.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct SortExec {
/// Input schema
pub(crate) input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index ae39cfe412..9ee0faaa0a 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -71,7 +71,7 @@ use log::{debug, trace};
///
/// If any of the input partitions return an error, the error is propagated to
/// the output and inputs are not polled again.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct SortPreservingMergeExec {
/// Input plan
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/streaming.rs
b/datafusion/physical-plan/src/streaming.rs
index cdb94af1fe..7ccef32480 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -55,6 +55,7 @@ pub trait PartitionStream: Debug + Send + Sync {
///
/// If your source can be represented as one or more [`PartitionStream`]s, you
can
/// use this struct to implement [`ExecutionPlan`].
+#[derive(Clone)]
pub struct StreamingTableExec {
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<Arc<[usize]>>,
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 69cc63f8f6..bd36753880 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -85,7 +85,7 @@ use tokio::macros::support::thread_rng_n;
/// │Input 1 │ │Input 2 │
/// └─────────────────┘ └──────────────────┘
/// ```
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct UnionExec {
/// Input execution plan
inputs: Vec<Arc<dyn ExecutionPlan>>,
@@ -298,7 +298,7 @@ impl ExecutionPlan for UnionExec {
/// | |-----------------+
/// +---------+
/// ```
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct InterleaveExec {
/// Input execution plan
inputs: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/physical-plan/src/unnest.rs
b/datafusion/physical-plan/src/unnest.rs
index 3e312b7451..b7b9f17eb1 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -56,7 +56,7 @@ use log::trace;
/// Thus the original RecordBatch with dimension (n x m) may have new
dimension (n' x m')
///
/// See [`UnnestOptions`] for more details and an example.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct UnnestExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/values.rs
b/datafusion/physical-plan/src/values.rs
index 991146d245..edadf98cb1 100644
--- a/datafusion/physical-plan/src/values.rs
+++ b/datafusion/physical-plan/src/values.rs
@@ -36,7 +36,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
/// Execution plan for values list based relation (produces constant rows)
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct ValuesExec {
/// The schema
schema: SchemaRef,
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 2c60be49a4..8c0331f945 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -67,7 +67,7 @@ use indexmap::IndexMap;
use log::debug;
/// Window execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct BoundedWindowAggExec {
/// Input plan
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index 1318f36f26..f71a0b9fd0 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -46,7 +46,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering,
LexRequirement};
use futures::{ready, Stream, StreamExt};
/// Window execution plan
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct WindowAggExec {
/// Input plan
pub(crate) input: Arc<dyn ExecutionPlan>,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]