This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2176ff5  Implement serde for ShuffleWriterExec (#712)
2176ff5 is described below

commit 2176ff592e38147c1c779b5660ec73db9c161a67
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jul 12 18:23:43 2021 -0600

    Implement serde for ShuffleWriterExec (#712)
---
 ballista/rust/core/proto/ballista.proto            | 10 +++++++
 .../core/src/execution_plans/shuffle_writer.rs     |  7 +++--
 .../core/src/serde/physical_plan/from_proto.rs     | 32 ++++++++++++++++++++-
 ballista/rust/core/src/serde/physical_plan/mod.rs  | 16 +++++++++++
 .../rust/core/src/serde/physical_plan/to_proto.rs  | 33 +++++++++++++++++++++-
 5 files changed, 94 insertions(+), 4 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index b1c153d..5b3e93e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -424,6 +424,7 @@ message PhysicalPlanNode {
     UnresolvedShuffleExecNode unresolved = 15;
     RepartitionExecNode repartition = 16;
     WindowAggExecNode window = 17;
+    ShuffleWriterExecNode shuffle_writer = 18;
   }
 }
 
@@ -629,6 +630,15 @@ message HashAggregateExecNode {
   Schema input_schema = 7;
 }
 
+message ShuffleWriterExecNode {
+  //TODO it seems redundant to provide job and stage id here since we also 
have them
+  // in the TaskDefinition that wraps this plan
+  string job_id = 1;
+  uint32 stage_id = 2;
+  PhysicalPlanNode input = 3;
+  PhysicalHashRepartition output_partitioning = 4;
+}
+
 message ShuffleReaderExecNode {
   repeated ShuffleReaderPartition partition = 1;
   Schema schema = 2;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs 
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 83d40ae..d5c7d8f 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -126,7 +126,10 @@ impl ExecutionPlan for ShuffleWriterExec {
     }
 
     fn output_partitioning(&self) -> Partitioning {
-        self.plan.output_partitioning()
+        match &self.shuffle_output_partitioning {
+            Some(p) => p.clone(),
+            _ => Partitioning::UnknownPartitioning(1),
+        }
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -143,7 +146,7 @@ impl ExecutionPlan for ShuffleWriterExec {
             self.stage_id,
             children[0].clone(),
             self.work_dir.clone(),
-            None,
+            self.shuffle_output_partitioning.clone(),
         )?))
     }
 
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 12c1743..a1a60bd 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -22,7 +22,9 @@ use std::convert::{TryFrom, TryInto};
 use std::sync::Arc;
 
 use crate::error::BallistaError;
-use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
+use crate::execution_plans::{
+    ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
+};
 use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
 use crate::serde::protobuf::ShuffleReaderPartition;
 use crate::serde::scheduler::PartitionLocation;
@@ -370,6 +372,34 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
                     partition_mode,
                 )?))
             }
+            PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
+                let input: Arc<dyn ExecutionPlan> =
+                    convert_box_required!(shuffle_writer.input)?;
+
+                let output_partitioning = match 
&shuffle_writer.output_partitioning {
+                    Some(hash_part) => {
+                        let expr = hash_part
+                            .hash_expr
+                            .iter()
+                            .map(|e| e.try_into())
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>, 
_>>()?;
+
+                        Some(Partitioning::Hash(
+                            expr,
+                            hash_part.partition_count.try_into().unwrap(),
+                        ))
+                    }
+                    None => None,
+                };
+
+                Ok(Arc::new(ShuffleWriterExec::try_new(
+                    shuffle_writer.job_id.clone(),
+                    shuffle_writer.stage_id as usize,
+                    input,
+                    "".to_string(), // this is intentional but hacky - the 
executor will fill this in
+                    output_partitioning,
+                )?))
+            }
             PhysicalPlanType::ShuffleReader(shuffle_reader) => {
                 let schema = 
Arc::new(convert_required!(shuffle_reader.schema)?);
                 let partition_location: Vec<Vec<PartitionLocation>> = 
shuffle_reader
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 3bf7e9c..f544859 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -45,6 +45,7 @@ mod roundtrip_tests {
 
     use super::super::super::error::Result;
     use super::super::protobuf;
+    use crate::execution_plans::ShuffleWriterExec;
 
     fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
         let proto: protobuf::PhysicalPlanNode = exec_plan.clone().try_into()?;
@@ -184,4 +185,19 @@ mod roundtrip_tests {
             Arc::new(EmptyExec::new(false, schema)),
         )?))
     }
+
+    #[test]
+    fn roundtrip_shuffle_writer() -> Result<()> {
+        let field_a = Field::new("a", DataType::Int64, false);
+        let field_b = Field::new("b", DataType::Int64, false);
+        let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+        roundtrip_test(Arc::new(ShuffleWriterExec::try_new(
+            "job123".to_string(),
+            123,
+            Arc::new(EmptyExec::new(false, schema)),
+            "".to_string(),
+            Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4)),
+        )?))
+    }
 }
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 875dbf2..cdd33f9 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -55,7 +55,9 @@ use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, 
PhysicalExpr};
 use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
 use protobuf::physical_plan_node::PhysicalPlanType;
 
-use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
+use crate::execution_plans::{
+    ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
+};
 use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
 use crate::serde::scheduler::PartitionLocation;
 use crate::serde::{protobuf, BallistaError};
@@ -356,6 +358,35 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn 
ExecutionPlan> {
                     },
                 ))),
             })
+        } else if let Some(exec) = plan.downcast_ref::<ShuffleWriterExec>() {
+            let input: protobuf::PhysicalPlanNode =
+                exec.children()[0].to_owned().try_into()?;
+            Ok(protobuf::PhysicalPlanNode {
+                physical_plan_type: 
Some(PhysicalPlanType::ShuffleWriter(Box::new(
+                    protobuf::ShuffleWriterExecNode {
+                        job_id: exec.job_id().to_string(),
+                        stage_id: exec.stage_id() as u32,
+                        input: Some(Box::new(input)),
+                        output_partitioning: match exec.output_partitioning() {
+                            Partitioning::Hash(exprs, partition_count) => {
+                                Some(protobuf::PhysicalHashRepartition {
+                                    hash_expr: exprs
+                                        .iter()
+                                        .map(|expr| expr.clone().try_into())
+                                        .collect::<Result<Vec<_>, 
BallistaError>>()?,
+                                    partition_count: partition_count as u64,
+                                })
+                            }
+                            other => {
+                                return Err(BallistaError::General(format!(
+                                    "physical_plan::to_proto() invalid 
partitioning for ShuffleWriterExec: {:?}",
+                                    other
+                                )))
+                            }
+                        },
+                    },
+                ))),
+            })
         } else if let Some(exec) = 
plan.downcast_ref::<UnresolvedShuffleExec>() {
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Unresolved(

Reply via email to