This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2176ff5 Implement serde for ShuffleWriterExec (#712)
2176ff5 is described below
commit 2176ff592e38147c1c779b5660ec73db9c161a67
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jul 12 18:23:43 2021 -0600
Implement serde for ShuffleWriterExec (#712)
---
ballista/rust/core/proto/ballista.proto | 10 +++++++
.../core/src/execution_plans/shuffle_writer.rs | 7 +++--
.../core/src/serde/physical_plan/from_proto.rs | 32 ++++++++++++++++++++-
ballista/rust/core/src/serde/physical_plan/mod.rs | 16 +++++++++++
.../rust/core/src/serde/physical_plan/to_proto.rs | 33 +++++++++++++++++++++-
5 files changed, 94 insertions(+), 4 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index b1c153d..5b3e93e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -424,6 +424,7 @@ message PhysicalPlanNode {
UnresolvedShuffleExecNode unresolved = 15;
RepartitionExecNode repartition = 16;
WindowAggExecNode window = 17;
+ ShuffleWriterExecNode shuffle_writer = 18;
}
}
@@ -629,6 +630,15 @@ message HashAggregateExecNode {
Schema input_schema = 7;
}
+message ShuffleWriterExecNode {
+ //TODO it seems redundant to provide job and stage id here since we also
have them
+ // in the TaskDefinition that wraps this plan
+ string job_id = 1;
+ uint32 stage_id = 2;
+ PhysicalPlanNode input = 3;
+ PhysicalHashRepartition output_partitioning = 4;
+}
+
message ShuffleReaderExecNode {
repeated ShuffleReaderPartition partition = 1;
Schema schema = 2;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 83d40ae..d5c7d8f 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -126,7 +126,10 @@ impl ExecutionPlan for ShuffleWriterExec {
}
fn output_partitioning(&self) -> Partitioning {
- self.plan.output_partitioning()
+ match &self.shuffle_output_partitioning {
+ Some(p) => p.clone(),
+ _ => Partitioning::UnknownPartitioning(1),
+ }
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -143,7 +146,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.stage_id,
children[0].clone(),
self.work_dir.clone(),
- None,
+ self.shuffle_output_partitioning.clone(),
)?))
}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 12c1743..a1a60bd 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -22,7 +22,9 @@ use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use crate::error::BallistaError;
-use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
+use crate::execution_plans::{
+ ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
+};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
@@ -370,6 +372,34 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
partition_mode,
)?))
}
+ PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
+ let input: Arc<dyn ExecutionPlan> =
+ convert_box_required!(shuffle_writer.input)?;
+
+ let output_partitioning = match
&shuffle_writer.output_partitioning {
+ Some(hash_part) => {
+ let expr = hash_part
+ .hash_expr
+ .iter()
+ .map(|e| e.try_into())
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>,
_>>()?;
+
+ Some(Partitioning::Hash(
+ expr,
+ hash_part.partition_count.try_into().unwrap(),
+ ))
+ }
+ None => None,
+ };
+
+ Ok(Arc::new(ShuffleWriterExec::try_new(
+ shuffle_writer.job_id.clone(),
+ shuffle_writer.stage_id as usize,
+ input,
+ "".to_string(), // this is intentional but hacky - the
executor will fill this in
+ output_partitioning,
+ )?))
+ }
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
let schema =
Arc::new(convert_required!(shuffle_reader.schema)?);
let partition_location: Vec<Vec<PartitionLocation>> =
shuffle_reader
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 3bf7e9c..f544859 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -45,6 +45,7 @@ mod roundtrip_tests {
use super::super::super::error::Result;
use super::super::protobuf;
+ use crate::execution_plans::ShuffleWriterExec;
fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let proto: protobuf::PhysicalPlanNode = exec_plan.clone().try_into()?;
@@ -184,4 +185,19 @@ mod roundtrip_tests {
Arc::new(EmptyExec::new(false, schema)),
)?))
}
+
+ #[test]
+ fn roundtrip_shuffle_writer() -> Result<()> {
+ let field_a = Field::new("a", DataType::Int64, false);
+ let field_b = Field::new("b", DataType::Int64, false);
+ let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+ roundtrip_test(Arc::new(ShuffleWriterExec::try_new(
+ "job123".to_string(),
+ 123,
+ Arc::new(EmptyExec::new(false, schema)),
+ "".to_string(),
+ Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4)),
+ )?))
+ }
}
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 875dbf2..cdd33f9 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -55,7 +55,9 @@ use datafusion::physical_plan::{AggregateExpr, ExecutionPlan,
PhysicalExpr};
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use protobuf::physical_plan_node::PhysicalPlanType;
-use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
+use crate::execution_plans::{
+ ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
+};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
@@ -356,6 +358,35 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn
ExecutionPlan> {
},
))),
})
+ } else if let Some(exec) = plan.downcast_ref::<ShuffleWriterExec>() {
+ let input: protobuf::PhysicalPlanNode =
+ exec.children()[0].to_owned().try_into()?;
+ Ok(protobuf::PhysicalPlanNode {
+ physical_plan_type:
Some(PhysicalPlanType::ShuffleWriter(Box::new(
+ protobuf::ShuffleWriterExecNode {
+ job_id: exec.job_id().to_string(),
+ stage_id: exec.stage_id() as u32,
+ input: Some(Box::new(input)),
+ output_partitioning: match exec.output_partitioning() {
+ Partitioning::Hash(exprs, partition_count) => {
+ Some(protobuf::PhysicalHashRepartition {
+ hash_expr: exprs
+ .iter()
+ .map(|expr| expr.clone().try_into())
+ .collect::<Result<Vec<_>,
BallistaError>>()?,
+ partition_count: partition_count as u64,
+ })
+ }
+ other => {
+ return Err(BallistaError::General(format!(
+ "physical_plan::to_proto() invalid
partitioning for ShuffleWriterExec: {:?}",
+ other
+ )))
+ }
+ },
+ },
+ ))),
+ })
} else if let Some(exec) =
plan.downcast_ref::<UnresolvedShuffleExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Unresolved(