This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 62b0349 Remove hard-coded PartitionMode from Ballista serde (#637)
62b0349 is described below
commit 62b03493e3dd02814af3e10aac839c2ef4e89f55
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jun 28 08:19:47 2021 -0600
Remove hard-coded PartitionMode from Ballista serde (#637)
---
ballista/rust/core/proto/ballista.proto | 7 ++++-
.../core/src/serde/physical_plan/from_proto.rs | 14 +++++++++-
ballista/rust/core/src/serde/physical_plan/mod.rs | 30 +++++++++++++++++-----
.../rust/core/src/serde/physical_plan/to_proto.rs | 7 ++++-
datafusion/src/physical_plan/hash_join.rs | 5 ++++
5 files changed, 53 insertions(+), 10 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 2aa6102..e378806 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -565,12 +565,17 @@ message CsvScanExecNode {
repeated string filename = 8;
}
+enum PartitionMode {
+ COLLECT_LEFT = 0;
+ PARTITIONED = 1;
+}
+
message HashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
JoinType join_type = 4;
-
+ PartitionMode partition_mode = 6;
}
message PhysicalColumn {
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 83cbdb4..717ee20 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -356,12 +356,24 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};
+ let partition_mode =
+ protobuf::PartitionMode::from_i32(hashjoin.partition_mode)
+ .ok_or_else(|| {
+ proto_error(format!(
+ "Received a HashJoinNode message with unknown
PartitionMode {}",
+ hashjoin.partition_mode
+ ))
+ })?;
+ let partition_mode = match partition_mode {
+ protobuf::PartitionMode::CollectLeft =>
PartitionMode::CollectLeft,
+ protobuf::PartitionMode::Partitioned =>
PartitionMode::Partitioned,
+ };
Ok(Arc::new(HashJoinExec::try_new(
left,
right,
on,
&join_type,
- PartitionMode::CollectLeft,
+ partition_mode,
)?))
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index c0fe81f..a393d7f 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -88,13 +88,29 @@ mod roundtrip_tests {
Column::new("col", schema_right.index_of("col")?),
)];
- roundtrip_test(Arc::new(HashJoinExec::try_new(
- Arc::new(EmptyExec::new(false, Arc::new(schema_left))),
- Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
- on,
- &JoinType::Inner,
- PartitionMode::CollectLeft,
- )?))
+ let schema_left = Arc::new(schema_left);
+ let schema_right = Arc::new(schema_right);
+ for join_type in &[
+ JoinType::Inner,
+ JoinType::Left,
+ JoinType::Right,
+ JoinType::Full,
+ JoinType::Anti,
+ JoinType::Semi,
+ ] {
+ for partition_mode in
+ &[PartitionMode::Partitioned, PartitionMode::CollectLeft]
+ {
+ roundtrip_test(Arc::new(HashJoinExec::try_new(
+ Arc::new(EmptyExec::new(false, schema_left.clone())),
+ Arc::new(EmptyExec::new(false, schema_right.clone())),
+ on.clone(),
+ &join_type,
+ *partition_mode,
+ )?))?;
+ }
+ }
+ Ok(())
}
#[test]
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 306abc1..0fc2785 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -34,7 +34,7 @@ use datafusion::physical_plan::expressions::{
use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::AggregateMode;
-use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::hash_utils::JoinType;
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::parquet::ParquetExec;
@@ -143,6 +143,10 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn
ExecutionPlan> {
JoinType::Semi => protobuf::JoinType::Semi,
JoinType::Anti => protobuf::JoinType::Anti,
};
+ let partition_mode = match exec.partition_mode() {
+ PartitionMode::CollectLeft =>
protobuf::PartitionMode::CollectLeft,
+ PartitionMode::Partitioned =>
protobuf::PartitionMode::Partitioned,
+ };
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
protobuf::HashJoinExecNode {
@@ -150,6 +154,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn
ExecutionPlan> {
right: Some(Box::new(right)),
on,
join_type: join_type.into(),
+ partition_mode: partition_mode.into(),
},
))),
})
diff --git a/datafusion/src/physical_plan/hash_join.rs
b/datafusion/src/physical_plan/hash_join.rs
index eb5ceaf..195a19c 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -177,6 +177,11 @@ impl HashJoinExec {
&self.join_type
}
+ /// The partitioning mode of this hash join
+ pub fn partition_mode(&self) -> &PartitionMode {
+ &self.mode
+ }
+
/// Calculates column indices and left/right placement on input / output
schemas and jointype
fn column_indices_from_schema(&self) -> ArrowResult<Vec<ColumnIndex>> {
let (primary_is_left, primary_schema, secondary_schema) = match
self.join_type {