This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 57eeb64 [ARROW-12441] [DataFusion] Cross join implementation (#11)
57eeb64 is described below
commit 57eeb64659b9ca9c496a959f7716090fb32085b6
Author: Daniƫl Heres <[email protected]>
AuthorDate: Thu Apr 22 15:25:45 2021 +0200
[ARROW-12441] [DataFusion] Cross join implementation (#11)
* Cross join implementation
* Add to ballista, debug line
* Add to tpch test, format
* Simplify a bit
* Row-by-row processing for the left side to keep memory down
* Fix
* Fmt
* Clippy
* Fix doc, don't include as much debug info in memoryexec debug
* Use join
* Fix doc
* Add test cases with partitions
* Make clear that mutex is locked for very short amount of time
* Unwrap the lock
---
.../rust/core/src/serde/logical_plan/to_proto.rs | 1 +
benchmarks/src/bin/tpch.rs | 5 +
datafusion/README.md | 4 +-
datafusion/src/logical_plan/builder.rs | 10 +
datafusion/src/logical_plan/plan.rs | 27 +-
datafusion/src/optimizer/constant_folding.rs | 3 +-
datafusion/src/optimizer/filter_push_down.rs | 3 +-
datafusion/src/optimizer/hash_build_probe_order.rs | 27 ++
datafusion/src/optimizer/projection_push_down.rs | 1 +
datafusion/src/optimizer/utils.rs | 5 +
datafusion/src/physical_plan/cross_join.rs | 318 +++++++++++++++++++++
datafusion/src/physical_plan/hash_utils.rs | 5 -
datafusion/src/physical_plan/memory.rs | 10 +-
datafusion/src/physical_plan/mod.rs | 1 +
datafusion/src/physical_plan/planner.rs | 9 +-
datafusion/src/sql/planner.rs | 22 +-
datafusion/tests/sql.rs | 54 +++-
17 files changed, 479 insertions(+), 26 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index a181f98..222b767 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -940,6 +940,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
}
LogicalPlan::Extension { .. } => unimplemented!(),
LogicalPlan::Union { .. } => unimplemented!(),
+ LogicalPlan::CrossJoin { .. } => unimplemented!(),
}
}
}
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 328a68d..b203ceb 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1375,6 +1375,11 @@ mod tests {
}
#[tokio::test]
+ async fn run_q9() -> Result<()> {
+ run_query(9).await
+ }
+
+ #[tokio::test]
async fn run_q10() -> Result<()> {
run_query(10).await
}
diff --git a/datafusion/README.md b/datafusion/README.md
index 9e6b7a2..ff0b26d 100644
--- a/datafusion/README.md
+++ b/datafusion/README.md
@@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive
SQL utility. See the
- [ ] MINUS
- [x] Joins
- [x] INNER JOIN
- - [ ] CROSS JOIN
+ - [x] LEFT JOIN
+ - [x] RIGHT JOIN
+ - [x] CROSS JOIN
- [ ] OUTER JOIN
- [ ] Window
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index fed82fd..b6017b7 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -270,6 +270,16 @@ impl LogicalPlanBuilder {
}))
}
}
+ /// Apply a cross join
+ pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
+ let schema = self.plan.schema().join(right.schema())?;
+
+ Ok(Self::from(&LogicalPlan::CrossJoin {
+ left: Arc::new(self.plan.clone()),
+ right: Arc::new(right.clone()),
+ schema: DFSchemaRef::new(schema),
+ }))
+ }
/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) ->
Result<Self> {
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index d1b9b82..606ef1e 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -113,6 +113,15 @@ pub enum LogicalPlan {
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
+ /// Apply Cross Join to two logical plans
+ CrossJoin {
+ /// Left input
+ left: Arc<LogicalPlan>,
+ /// Right input
+ right: Arc<LogicalPlan>,
+ /// The output schema, containing fields from the left and right inputs
+ schema: DFSchemaRef,
+ },
/// Repartition the plan based on a partitioning scheme.
Repartition {
/// The incoming logical plan
@@ -203,6 +212,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { schema, .. } => &schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => &schema,
+ LogicalPlan::CrossJoin { schema, .. } => &schema,
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => &schema,
@@ -229,6 +239,11 @@ impl LogicalPlan {
right,
schema,
..
+ }
+ | LogicalPlan::CrossJoin {
+ left,
+ right,
+ schema,
} => {
let mut schemas = left.all_schemas();
schemas.extend(right.all_schemas());
@@ -290,8 +305,9 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::Explain { .. } => vec![],
- LogicalPlan::Union { .. } => {
+ | LogicalPlan::CrossJoin { .. }
+ | LogicalPlan::Explain { .. }
+ | LogicalPlan::Union { .. } => {
vec![]
}
}
@@ -307,6 +323,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
+ LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
LogicalPlan::Limit { input, .. } => vec![input],
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
@@ -396,7 +413,8 @@ impl LogicalPlan {
LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
- LogicalPlan::Join { left, right, .. } => {
+ LogicalPlan::Join { left, right, .. }
+ | LogicalPlan::CrossJoin { left, right, .. } => {
left.accept(visitor)? && right.accept(visitor)?
}
LogicalPlan::Union { inputs, .. } => {
@@ -669,6 +687,9 @@ impl LogicalPlan {
keys.iter().map(|(l, r)| format!("{} = {}", l,
r)).collect();
write!(f, "Join: {}", join_expr.join(", "))
}
+ LogicalPlan::CrossJoin { .. } => {
+ write!(f, "CrossJoin:")
+ }
LogicalPlan::Repartition {
partitioning_scheme,
..
diff --git a/datafusion/src/optimizer/constant_folding.rs
b/datafusion/src/optimizer/constant_folding.rs
index d63177b..71c84f6 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Explain { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
- | LogicalPlan::Join { .. } => {
+ | LogicalPlan::Join { .. }
+ | LogicalPlan::CrossJoin { .. } => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
diff --git a/datafusion/src/optimizer/filter_push_down.rs
b/datafusion/src/optimizer/filter_push_down.rs
index ec260a4..4622e9f 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -314,7 +314,8 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
.collect::<HashSet<_>>();
issue_filters(state, used_columns, plan)
}
- LogicalPlan::Join { left, right, .. } => {
+ LogicalPlan::Join { left, right, .. }
+ | LogicalPlan::CrossJoin { left, right, .. } => {
let (pushable_to_left, pushable_to_right, keep) =
get_join_predicates(&state, &left.schema(), &right.schema());
diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs
b/datafusion/src/optimizer/hash_build_probe_order.rs
index f44050f..086e2f0 100644
--- a/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize>
{
// we cannot predict the cardinality of the join output
None
}
+ LogicalPlan::CrossJoin { left, right, .. } => {
+ // number of rows is equal to num_left * num_right
+ get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y))
+ }
LogicalPlan::Repartition { .. } => {
// we cannot predict how rows will be repartitioned
None
@@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder {
})
}
}
+ LogicalPlan::CrossJoin {
+ left,
+ right,
+ schema,
+ } => {
+ let left = self.optimize(left)?;
+ let right = self.optimize(right)?;
+ if should_swap_join_order(&left, &right) {
+ // Swap left and right
+ Ok(LogicalPlan::CrossJoin {
+ left: Arc::new(right),
+ right: Arc::new(left),
+ schema: schema.clone(),
+ })
+ } else {
+ // Keep join as is
+ Ok(LogicalPlan::CrossJoin {
+ left: Arc::new(left),
+ right: Arc::new(right),
+ schema: schema.clone(),
+ })
+ }
+ }
// Rest: recurse into plan, apply optimization where possible
LogicalPlan::Projection { .. }
| LogicalPlan::Aggregate { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index 6b1cdfe..7243fa5 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -270,6 +270,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Union { .. }
+ | LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index fe1d023..0ec3fa7 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -208,6 +208,11 @@ pub fn from_plan(
on: on.clone(),
schema: schema.clone(),
}),
+ LogicalPlan::CrossJoin { schema, .. } => Ok(LogicalPlan::CrossJoin {
+ left: Arc::new(inputs[0].clone()),
+ right: Arc::new(inputs[1].clone()),
+ schema: schema.clone(),
+ }),
LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
n: *n,
input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/src/physical_plan/cross_join.rs
b/datafusion/src/physical_plan/cross_join.rs
new file mode 100644
index 0000000..4372352
--- /dev/null
+++ b/datafusion/src/physical_plan/cross_join.rs
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the cross join plan for loading the left side of the cross join
+//! and producing batches in parallel for the right partitions
+
+use futures::{lock::Mutex, StreamExt};
+use std::{any::Any, sync::Arc, task::Poll};
+
+use crate::physical_plan::memory::MemoryStream;
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use futures::{Stream, TryStreamExt};
+
+use super::{hash_utils::check_join_is_valid, merge::MergeExec};
+use crate::{
+ error::{DataFusionError, Result},
+ scalar::ScalarValue,
+};
+use async_trait::async_trait;
+use std::time::Instant;
+
+use super::{ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
+use log::debug;
+
+/// Data of the left side
+type JoinLeftData = RecordBatch;
+
+/// executes partitions in parallel and combines them into a set of
+/// partitions by combining all values from the left with all values on the
right
+#[derive(Debug)]
+pub struct CrossJoinExec {
+ /// left (build) side which gets loaded in memory
+ left: Arc<dyn ExecutionPlan>,
+ /// right (probe) side which are combined with left side
+ right: Arc<dyn ExecutionPlan>,
+ /// The schema once the join is applied
+ schema: SchemaRef,
+ /// Build-side data
+ build_side: Arc<Mutex<Option<JoinLeftData>>>,
+}
+
+impl CrossJoinExec {
+ /// Tries to create a new [CrossJoinExec].
+ /// # Error
+ /// This function errors when left and right schema's can't be combined
+ pub fn try_new(
+ left: Arc<dyn ExecutionPlan>,
+ right: Arc<dyn ExecutionPlan>,
+ ) -> Result<Self> {
+ let left_schema = left.schema();
+ let right_schema = right.schema();
+ check_join_is_valid(&left_schema, &right_schema, &[])?;
+
+ let left_schema = left.schema();
+ let left_fields = left_schema.fields().iter();
+ let right_schema = right.schema();
+
+ let right_fields = right_schema.fields().iter();
+
+ // left then right
+ let all_columns = left_fields.chain(right_fields).cloned().collect();
+
+ let schema = Arc::new(Schema::new(all_columns));
+
+ Ok(CrossJoinExec {
+ left,
+ right,
+ schema,
+ build_side: Arc::new(Mutex::new(None)),
+ })
+ }
+
+ /// left (build) side which gets loaded in memory
+ pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.left
+ }
+
+ /// right side which gets combined with left side
+ pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.right
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for CrossJoinExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.left.clone(), self.right.clone()]
+ }
+
+ fn with_new_children(
+ &self,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ match children.len() {
+ 2 => Ok(Arc::new(CrossJoinExec::try_new(
+ children[0].clone(),
+ children[1].clone(),
+ )?)),
+ _ => Err(DataFusionError::Internal(
+ "CrossJoinExec wrong number of children".to_string(),
+ )),
+ }
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ self.right.output_partitioning()
+ }
+
+ async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
+ // we only want to compute the build side once
+ let left_data = {
+ let mut build_side = self.build_side.lock().await;
+
+ match build_side.as_ref() {
+ Some(stream) => stream.clone(),
+ None => {
+ let start = Instant::now();
+
+ // merge all left parts into a single stream
+ let merge = MergeExec::new(self.left.clone());
+ let stream = merge.execute(0).await?;
+
+ // Load all batches and count the rows
+ let (batches, num_rows) = stream
+ .try_fold((Vec::new(), 0usize), |mut acc, batch| async
{
+ acc.1 += batch.num_rows();
+ acc.0.push(batch);
+ Ok(acc)
+ })
+ .await?;
+ let merged_batch =
+ concat_batches(&self.left.schema(), &batches,
num_rows)?;
+ *build_side = Some(merged_batch.clone());
+
+ debug!(
+ "Built build-side of cross join containing {} rows in
{} ms",
+ num_rows,
+ start.elapsed().as_millis()
+ );
+
+ merged_batch
+ }
+ }
+ };
+
+ let stream = self.right.execute(partition).await?;
+
+ if left_data.num_rows() == 0 {
+ return Ok(Box::pin(MemoryStream::try_new(
+ vec![],
+ self.schema.clone(),
+ None,
+ )?));
+ }
+
+ Ok(Box::pin(CrossJoinStream {
+ schema: self.schema.clone(),
+ left_data,
+ right: stream,
+ right_batch: Arc::new(std::sync::Mutex::new(None)),
+ left_index: 0,
+ num_input_batches: 0,
+ num_input_rows: 0,
+ num_output_batches: 0,
+ num_output_rows: 0,
+ join_time: 0,
+ }))
+ }
+}
+
+/// A stream that issues [RecordBatch]es as they arrive from the right of the
join.
+struct CrossJoinStream {
+ /// Input schema
+ schema: Arc<Schema>,
+ /// data from the left side
+ left_data: JoinLeftData,
+ /// right
+ right: SendableRecordBatchStream,
+ /// Current value on the left
+ left_index: usize,
+ /// Current batch being processed from the right side
+ right_batch: Arc<std::sync::Mutex<Option<RecordBatch>>>,
+ /// number of input batches
+ num_input_batches: usize,
+ /// number of input rows
+ num_input_rows: usize,
+ /// number of batches produced
+ num_output_batches: usize,
+ /// number of rows produced
+ num_output_rows: usize,
+ /// total time for joining probe-side batches to the build-side batches
+ join_time: usize,
+}
+
+impl RecordBatchStream for CrossJoinStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+fn build_batch(
+ left_index: usize,
+ batch: &RecordBatch,
+ left_data: &RecordBatch,
+ schema: &Schema,
+) -> ArrowResult<RecordBatch> {
+ // Repeat value on the left n times
+ let arrays = left_data
+ .columns()
+ .iter()
+ .map(|arr| {
+ let scalar = ScalarValue::try_from_array(arr, left_index)?;
+ Ok(scalar.to_array_of_size(batch.num_rows()))
+ })
+ .collect::<Result<Vec<_>>>()
+ .map_err(|x| x.into_arrow_external_error())?;
+
+ RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ arrays
+ .iter()
+ .chain(batch.columns().iter())
+ .cloned()
+ .collect(),
+ )
+}
+
+#[async_trait]
+impl Stream for CrossJoinStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+ let start = Instant::now();
+ let right_batch = {
+ let right_batch = self.right_batch.lock().unwrap();
+ right_batch.clone().unwrap()
+ };
+ let result =
+ build_batch(self.left_index, &right_batch, &self.left_data,
&self.schema);
+ self.num_input_rows += right_batch.num_rows();
+ if let Ok(ref batch) = result {
+ self.join_time += start.elapsed().as_millis() as usize;
+ self.num_output_batches += 1;
+ self.num_output_rows += batch.num_rows();
+ }
+ self.left_index += 1;
+ return Poll::Ready(Some(result));
+ }
+ self.left_index = 0;
+ self.right
+ .poll_next_unpin(cx)
+ .map(|maybe_batch| match maybe_batch {
+ Some(Ok(batch)) => {
+ let start = Instant::now();
+ let result = build_batch(
+ self.left_index,
+ &batch,
+ &self.left_data,
+ &self.schema,
+ );
+ self.num_input_batches += 1;
+ self.num_input_rows += batch.num_rows();
+ if let Ok(ref batch) = result {
+ self.join_time += start.elapsed().as_millis() as usize;
+ self.num_output_batches += 1;
+ self.num_output_rows += batch.num_rows();
+ }
+ self.left_index = 1;
+
+ let mut right_batch = self.right_batch.lock().unwrap();
+ *right_batch = Some(batch);
+
+ Some(result)
+ }
+ other => {
+ debug!(
+ "Processed {} probe-side input batches containing {}
rows and \
+ produced {} output batches containing {} rows in {}
ms",
+ self.num_input_batches,
+ self.num_input_rows,
+ self.num_output_batches,
+ self.num_output_rows,
+ self.join_time
+ );
+ other
+ }
+ })
+ }
+}
diff --git a/datafusion/src/physical_plan/hash_utils.rs
b/datafusion/src/physical_plan/hash_utils.rs
index b26ff9b..a38cc09 100644
--- a/datafusion/src/physical_plan/hash_utils.rs
+++ b/datafusion/src/physical_plan/hash_utils.rs
@@ -52,11 +52,6 @@ fn check_join_set_is_valid(
right: &HashSet<String>,
on: &JoinOn,
) -> Result<()> {
- if on.is_empty() {
- return Err(DataFusionError::Plan(
- "The 'on' clause of a join cannot be empty".to_string(),
- ));
- }
let on_left = &on.iter().map(|on|
on.0.to_string()).collect::<HashSet<_>>();
let left_missing = on_left.difference(left).collect::<HashSet<_>>();
diff --git a/datafusion/src/physical_plan/memory.rs
b/datafusion/src/physical_plan/memory.rs
index bef9bcc..9022077 100644
--- a/datafusion/src/physical_plan/memory.rs
+++ b/datafusion/src/physical_plan/memory.rs
@@ -17,6 +17,7 @@
//! Execution plan for reading in-memory batches of data
+use core::fmt;
use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -31,7 +32,6 @@ use async_trait::async_trait;
use futures::Stream;
/// Execution plan for reading in-memory batches of data
-#[derive(Debug)]
pub struct MemoryExec {
/// The partitions to query
partitions: Vec<Vec<RecordBatch>>,
@@ -41,6 +41,14 @@ pub struct MemoryExec {
projection: Option<Vec<usize>>,
}
+impl fmt::Debug for MemoryExec {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "partitions: [...]")?;
+ write!(f, "schema: {:?}", self.schema)?;
+ write!(f, "projection: {:?}", self.projection)
+ }
+}
+
#[async_trait]
impl ExecutionPlan for MemoryExec {
/// Return a reference to Any that can be used for downcasting
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index 80dfe6e..11f0946 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -345,6 +345,7 @@ pub mod aggregates;
pub mod array_expressions;
pub mod coalesce_batches;
pub mod common;
+pub mod cross_join;
#[cfg(feature = "crypto_expressions")]
pub mod crypto_expressions;
pub mod csv;
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index f9279ae..ae6ad50 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -20,8 +20,8 @@
use std::sync::Arc;
use super::{
- aggregates, empty::EmptyExec, expressions::binary, functions,
- hash_join::PartitionMode, udaf, union::UnionExec,
+ aggregates, cross_join::CrossJoinExec, empty::EmptyExec,
expressions::binary,
+ functions, hash_join::PartitionMode, udaf, union::UnionExec,
};
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContextState;
@@ -328,6 +328,11 @@ impl DefaultPhysicalPlanner {
)?))
}
}
+ LogicalPlan::CrossJoin { left, right, .. } => {
+ let left = self.create_initial_plan(left, ctx_state)?;
+ let right = self.create_initial_plan(right, ctx_state)?;
+ Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
+ }
LogicalPlan::EmptyRelation {
produce_one_row,
schema,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index f3cba23..a40d0be 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -355,12 +355,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
JoinOperator::Inner(constraint) => {
self.parse_join(left, &right, constraint, JoinType::Inner)
}
+ JoinOperator::CrossJoin => self.parse_cross_join(left, &right),
other => Err(DataFusionError::NotImplemented(format!(
"Unsupported JOIN operator {:?}",
other
))),
}
}
+ fn parse_cross_join(
+ &self,
+ left: &LogicalPlan,
+ right: &LogicalPlan,
+ ) -> Result<LogicalPlan> {
+ LogicalPlanBuilder::from(&left).cross_join(&right)?.build()
+ }
fn parse_join(
&self,
@@ -489,9 +497,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
if join_keys.is_empty() {
- return Err(DataFusionError::NotImplemented(
- "Cartesian joins are not supported".to_string(),
- ));
+ left =
+
LogicalPlanBuilder::from(&left).cross_join(right)?.build()?;
} else {
let left_keys: Vec<_> =
join_keys.iter().map(|(l, _)| *l).collect();
@@ -517,9 +524,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if plans.len() == 1 {
Ok(plans[0].clone())
} else {
- Err(DataFusionError::NotImplemented(
- "Cartesian joins are not supported".to_string(),
- ))
+ let mut left = plans[0].clone();
+ for right in plans.iter().skip(1) {
+ left =
+
LogicalPlanBuilder::from(&left).cross_join(right)?.build()?;
+ }
+ Ok(left)
}
}
};
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index f4d4e65..70baffc 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1289,15 +1289,57 @@ async fn equijoin_implicit_syntax_reversed() ->
Result<()> {
}
#[tokio::test]
-async fn cartesian_join() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id")?;
+async fn cross_join() {
+ let mut ctx = create_join_context("t1_id", "t2_id").unwrap();
+
let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id";
- let maybe_plan = ctx.create_logical_plan(&sql);
+ let actual = execute(&mut ctx, sql).await;
+
+ assert_eq!(4 * 4, actual.len());
+
+ let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY
t1_id";
+ let actual = execute(&mut ctx, sql).await;
+
+ assert_eq!(4 * 4, actual.len());
+
+ let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2";
+ let actual = execute(&mut ctx, sql).await;
+
+ assert_eq!(4 * 4, actual.len());
+
assert_eq!(
- "This feature is not implemented: Cartesian joins are not supported",
- &format!("{}", maybe_plan.err().unwrap())
+ actual,
+ [
+ ["11", "a", "z"],
+ ["11", "a", "y"],
+ ["11", "a", "x"],
+ ["11", "a", "w"],
+ ["22", "b", "z"],
+ ["22", "b", "y"],
+ ["22", "b", "x"],
+ ["22", "b", "w"],
+ ["33", "c", "z"],
+ ["33", "c", "y"],
+ ["33", "c", "x"],
+ ["33", "c", "w"],
+ ["44", "d", "z"],
+ ["44", "d", "y"],
+ ["44", "d", "x"],
+ ["44", "d", "w"]
+ ]
);
- Ok(())
+
+ // Two partitions (from UNION) on the left
+ let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT
t1_id, t1_name FROM t1) t1 CROSS JOIN t2";
+ let actual = execute(&mut ctx, sql).await;
+
+ assert_eq!(4 * 4 * 2, actual.len());
+
+ // Two partitions (from UNION) on the right
+ let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT
t2_name FROM t2 UNION ALL SELECT t2_name FROM t2)";
+ let actual = execute(&mut ctx, sql).await;
+
+ assert_eq!(4 * 4 * 2, actual.len());
}
fn create_join_context(