This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a6976d9 union adding arc (#2868)
a1a6976d9 is described below

commit a1a6976d9d8663c629b6081171b9882561c9a81e
Author: comphead <[email protected]>
AuthorDate: Mon Jul 11 11:04:17 2022 -0700

    union adding arc (#2868)
---
 datafusion/expr/src/logical_plan/builder.rs      | 20 +++++++++++++-------
 datafusion/expr/src/logical_plan/plan.rs         |  6 ++++--
 datafusion/expr/src/utils.rs                     |  2 +-
 datafusion/optimizer/src/limit_push_down.rs      |  4 ++--
 datafusion/optimizer/src/projection_push_down.rs |  2 +-
 5 files changed, 21 insertions(+), 13 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index fd1886e20..a3abc694d 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -903,7 +903,7 @@ pub fn union_with_alias(
         .into_iter()
         .flat_map(|p| match p {
             LogicalPlan::Union(Union { inputs, .. }) => inputs,
-            x => vec![x],
+            x => vec![Arc::new(x)],
         });
 
     inputs_iter
@@ -916,16 +916,22 @@ pub fn union_with_alias(
         })?;
 
     let inputs = inputs_iter
-        .map(|p| match p {
+        .map(|p| match p.as_ref() {
             LogicalPlan::Projection(Projection {
                 expr, input, alias, ..
-            }) => {
-                project_with_column_index_alias(expr, input, 
union_schema.clone(), alias)
-                    .unwrap()
-            }
-            x => x,
+            }) => Arc::new(
+                project_with_column_index_alias(
+                    expr.to_vec(),
+                    input.clone(),
+                    union_schema.clone(),
+                    alias.clone(),
+                )
+                .unwrap(),
+            ),
+            x => Arc::new(x.clone()),
         })
         .collect::<Vec<_>>();
+
     if inputs.is_empty() {
         return Err(DataFusionError::Plan("Empty UNION".to_string()));
     }
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 685225381..0e52ff253 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -277,7 +277,9 @@ impl LogicalPlan {
             LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => 
vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
-            LogicalPlan::Union(Union { inputs, .. }) => 
inputs.iter().collect(),
+            LogicalPlan::Union(Union { inputs, .. }) => {
+                inputs.iter().map(|arc| arc.as_ref()).collect()
+            }
             LogicalPlan::Distinct(Distinct { input }) => vec![input],
             LogicalPlan::Explain(explain) => vec![&explain.plan],
             LogicalPlan::Analyze(analyze) => vec![&analyze.input],
@@ -1072,7 +1074,7 @@ pub struct Repartition {
 #[derive(Clone)]
 pub struct Union {
     /// Inputs to merge
-    pub inputs: Vec<LogicalPlan>,
+    pub inputs: Vec<Arc<LogicalPlan>>,
     /// Union schema. Should be the same for all inputs.
     pub schema: DFSchemaRef,
     /// Union output relation alias
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8d7f55417..3b55c4851 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -476,7 +476,7 @@ pub fn from_plan(
         })),
         LogicalPlan::Union(Union { schema, alias, .. }) => {
             Ok(LogicalPlan::Union(Union {
-                inputs: inputs.to_vec(),
+                inputs: inputs.iter().cloned().map(Arc::new).collect(),
                 schema: schema.clone(),
                 alias: alias.clone(),
             }))
diff --git a/datafusion/optimizer/src/limit_push_down.rs 
b/datafusion/optimizer/src/limit_push_down.rs
index 6ff30e78d..5c754d833 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -192,7 +192,7 @@ fn limit_push_down(
             let new_inputs = inputs
                 .iter()
                 .map(|x| {
-                    Ok(LogicalPlan::Limit(Limit {
+                    Ok(Arc::new(LogicalPlan::Limit(Limit {
                         skip: None,
                         fetch: Some(ancestor_fetch),
                         input: Arc::new(limit_push_down(
@@ -204,7 +204,7 @@ fn limit_push_down(
                             x,
                             _optimizer_config,
                         )?),
-                    }))
+                    })))
                 })
                 .collect::<Result<_>>()?;
             Ok(LogicalPlan::Union(Union {
diff --git a/datafusion/optimizer/src/projection_push_down.rs 
b/datafusion/optimizer/src/projection_push_down.rs
index a08bf698e..574236a69 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -454,7 +454,7 @@ fn optimize_plan(
                 schema.metadata().clone(),
             )?;
             Ok(LogicalPlan::Union(Union {
-                inputs: new_inputs,
+                inputs: new_inputs.iter().cloned().map(Arc::new).collect(),
                 schema: Arc::new(new_schema),
                 alias: alias.clone(),
             }))

Reply via email to