This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 34c3355 add cross join support to ballista (#891)
34c3355 is described below
commit 34c33557b88c1461cd595f1d9bd72fee6897cde9
Author: QP Hou <[email protected]>
AuthorDate: Mon Aug 16 14:01:47 2021 -0700
add cross join support to ballista (#891)
---
ballista/rust/core/proto/ballista.proto | 14 +++++++++++++-
ballista/rust/core/src/serde/logical_plan/from_proto.rs | 11 +++++++++--
ballista/rust/core/src/serde/logical_plan/to_proto.rs | 13 ++++++++++++-
ballista/rust/core/src/serde/physical_plan/from_proto.rs | 7 +++++++
ballista/rust/core/src/serde/physical_plan/to_proto.rs | 12 ++++++++++++
datafusion/src/physical_plan/planner.rs | 5 +++--
6 files changed, 56 insertions(+), 6 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index a1608c6..12a27ba 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -249,9 +249,10 @@ message LogicalPlanNode {
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
- AnalyzeNode analyze = 14;
ExplainNode explain = 12;
WindowNode window = 13;
+ AnalyzeNode analyze = 14;
+ CrossJoinNode cross_join = 15;
}
}
@@ -399,6 +400,11 @@ message JoinNode {
repeated Column right_join_column = 6;
}
+message CrossJoinNode {
+ LogicalPlanNode left = 1;
+ LogicalPlanNode right = 2;
+}
+
message LimitNode {
LogicalPlanNode input = 1;
uint32 limit = 2;
@@ -432,6 +438,7 @@ message PhysicalPlanNode {
RepartitionExecNode repartition = 16;
WindowAggExecNode window = 17;
ShuffleWriterExecNode shuffle_writer = 18;
+ CrossJoinExecNode cross_join = 19;
}
}
@@ -593,6 +600,11 @@ message HashJoinExecNode {
PartitionMode partition_mode = 6;
}
+message CrossJoinExecNode {
+ PhysicalPlanNode left = 1;
+ PhysicalPlanNode right = 2;
+}
+
message PhysicalColumn {
string name = 1;
uint32 index = 2;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index f9761a2..ade2cb4 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -41,8 +41,6 @@ use std::{
unimplemented,
};
-// use uuid::Uuid;
-
impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
type Error = BallistaError;
@@ -290,6 +288,15 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
builder.build().map_err(|e| e.into())
}
+ LogicalPlanType::CrossJoin(crossjoin) => {
+ let left = convert_box_required!(crossjoin.left)?;
+ let right = convert_box_required!(crossjoin.right)?;
+
+ LogicalPlanBuilder::from(left)
+ .cross_join(&right)?
+ .build()
+ .map_err(|e| e.into())
+ }
}
}
}
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index e1c7f53..5877ced 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -955,7 +955,18 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
}
LogicalPlan::Extension { .. } => unimplemented!(),
LogicalPlan::Union { .. } => unimplemented!(),
- LogicalPlan::CrossJoin { .. } => unimplemented!(),
+ LogicalPlan::CrossJoin { left, right, .. } => {
+ let left: protobuf::LogicalPlanNode =
left.as_ref().try_into()?;
+ let right: protobuf::LogicalPlanNode =
right.as_ref().try_into()?;
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type:
Some(LogicalPlanType::CrossJoin(Box::new(
+ protobuf::CrossJoinNode {
+ left: Some(Box::new(left)),
+ right: Some(Box::new(right)),
+ },
+ ))),
+ })
+ }
}
}
}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 678bcde..46815db 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -51,6 +51,7 @@ use datafusion::physical_plan::window_functions::{
use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
use datafusion::physical_plan::{
coalesce_batches::CoalesceBatchesExec,
+ cross_join::CrossJoinExec,
csv::CsvExec,
empty::EmptyExec,
expressions::{
@@ -372,6 +373,12 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
partition_mode,
)?))
}
+ PhysicalPlanType::CrossJoin(crossjoin) => {
+ let left: Arc<dyn ExecutionPlan> =
convert_box_required!(crossjoin.left)?;
+ let right: Arc<dyn ExecutionPlan> =
+ convert_box_required!(crossjoin.right)?;
+ Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
+ }
PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(shuffle_writer.input)?;
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 48b2134..8d8f917 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -28,6 +28,7 @@ use std::{
use datafusion::logical_plan::JoinType;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::expressions::{
CaseExpr, InListExpr, IsNotNullExpr, IsNullExpr, NegativeExpr, NotExpr,
@@ -155,6 +156,17 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn
ExecutionPlan> {
},
))),
})
+ } else if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
+ let left: protobuf::PhysicalPlanNode =
exec.left().to_owned().try_into()?;
+ let right: protobuf::PhysicalPlanNode =
exec.right().to_owned().try_into()?;
+ Ok(protobuf::PhysicalPlanNode {
+ physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
+ protobuf::CrossJoinExecNode {
+ left: Some(Box::new(left)),
+ right: Some(Box::new(right)),
+ },
+ ))),
+ })
} else if let Some(exec) = plan.downcast_ref::<HashAggregateExec>() {
let groups = exec
.group_expr()
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index 256a43b..02ab15d 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -19,8 +19,8 @@
use super::analyze::AnalyzeExec;
use super::{
- aggregates, cross_join::CrossJoinExec, empty::EmptyExec,
expressions::binary,
- functions, hash_join::PartitionMode, udaf, union::UnionExec, windows,
+ aggregates, empty::EmptyExec, expressions::binary, functions,
+ hash_join::PartitionMode, udaf, union::UnionExec, windows,
};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
@@ -29,6 +29,7 @@ use crate::logical_plan::{
UserDefinedLogicalNode,
};
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions;
use crate::physical_plan::expressions::{CaseExpr, Column, Literal,
PhysicalSortExpr};