This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 57eeb64  [ARROW-12441] [DataFusion] Cross join implementation (#11)
57eeb64 is described below

commit 57eeb64659b9ca9c496a959f7716090fb32085b6
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Thu Apr 22 15:25:45 2021 +0200

    [ARROW-12441] [DataFusion] Cross join implementation (#11)
    
    * Cross join implementation
    
    * Add to ballista, debug line
    
    * Add to tpch test, format
    
    * Simplify a bit
    
    * Row-by-row processing for the left side to keep memory down
    
    * Fix
    
    * Fmt
    
    * Clippy
    
    * Fix doc, don't include as much debug info in memoryexec debug
    
    * Use join
    
    * Fix doc
    
    * Add test cases with partitions
    
    * Make clear that mutex is locked for very short amount of time
    
    * Unwrap the lock
---
 .../rust/core/src/serde/logical_plan/to_proto.rs   |   1 +
 benchmarks/src/bin/tpch.rs                         |   5 +
 datafusion/README.md                               |   4 +-
 datafusion/src/logical_plan/builder.rs             |  10 +
 datafusion/src/logical_plan/plan.rs                |  27 +-
 datafusion/src/optimizer/constant_folding.rs       |   3 +-
 datafusion/src/optimizer/filter_push_down.rs       |   3 +-
 datafusion/src/optimizer/hash_build_probe_order.rs |  27 ++
 datafusion/src/optimizer/projection_push_down.rs   |   1 +
 datafusion/src/optimizer/utils.rs                  |   5 +
 datafusion/src/physical_plan/cross_join.rs         | 318 +++++++++++++++++++++
 datafusion/src/physical_plan/hash_utils.rs         |   5 -
 datafusion/src/physical_plan/memory.rs             |  10 +-
 datafusion/src/physical_plan/mod.rs                |   1 +
 datafusion/src/physical_plan/planner.rs            |   9 +-
 datafusion/src/sql/planner.rs                      |  22 +-
 datafusion/tests/sql.rs                            |  54 +++-
 17 files changed, 479 insertions(+), 26 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index a181f98..222b767 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -940,6 +940,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
             }
             LogicalPlan::Extension { .. } => unimplemented!(),
             LogicalPlan::Union { .. } => unimplemented!(),
+            LogicalPlan::CrossJoin { .. } => unimplemented!(),
         }
     }
 }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 328a68d..b203ceb 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1375,6 +1375,11 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn run_q9() -> Result<()> {
+        run_query(9).await
+    }
+
+    #[tokio::test]
     async fn run_q10() -> Result<()> {
         run_query(10).await
     }
diff --git a/datafusion/README.md b/datafusion/README.md
index 9e6b7a2..ff0b26d 100644
--- a/datafusion/README.md
+++ b/datafusion/README.md
@@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive 
SQL utility. See the
   - [ ] MINUS
 - [x] Joins
   - [x] INNER JOIN
-  - [ ] CROSS JOIN
+  - [x] LEFT JOIN
+  - [x] RIGHT JOIN
+  - [x] CROSS JOIN
   - [ ] OUTER JOIN
 - [ ] Window
 
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index fed82fd..b6017b7 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -270,6 +270,16 @@ impl LogicalPlanBuilder {
             }))
         }
     }
+    /// Apply a cross join
+    pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
+        let schema = self.plan.schema().join(right.schema())?;
+
+        Ok(Self::from(&LogicalPlan::CrossJoin {
+            left: Arc::new(self.plan.clone()),
+            right: Arc::new(right.clone()),
+            schema: DFSchemaRef::new(schema),
+        }))
+    }
 
     /// Repartition
     pub fn repartition(&self, partitioning_scheme: Partitioning) -> 
Result<Self> {
diff --git a/datafusion/src/logical_plan/plan.rs 
b/datafusion/src/logical_plan/plan.rs
index d1b9b82..606ef1e 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -113,6 +113,15 @@ pub enum LogicalPlan {
         /// The output schema, containing fields from the left and right inputs
         schema: DFSchemaRef,
     },
+    /// Apply Cross Join to two logical plans
+    CrossJoin {
+        /// Left input
+        left: Arc<LogicalPlan>,
+        /// Right input
+        right: Arc<LogicalPlan>,
+        /// The output schema, containing fields from the left and right inputs
+        schema: DFSchemaRef,
+    },
     /// Repartition the plan based on a partitioning scheme.
     Repartition {
         /// The incoming logical plan
@@ -203,6 +212,7 @@ impl LogicalPlan {
             LogicalPlan::Aggregate { schema, .. } => &schema,
             LogicalPlan::Sort { input, .. } => input.schema(),
             LogicalPlan::Join { schema, .. } => &schema,
+            LogicalPlan::CrossJoin { schema, .. } => &schema,
             LogicalPlan::Repartition { input, .. } => input.schema(),
             LogicalPlan::Limit { input, .. } => input.schema(),
             LogicalPlan::CreateExternalTable { schema, .. } => &schema,
@@ -229,6 +239,11 @@ impl LogicalPlan {
                 right,
                 schema,
                 ..
+            }
+            | LogicalPlan::CrossJoin {
+                left,
+                right,
+                schema,
             } => {
                 let mut schemas = left.all_schemas();
                 schemas.extend(right.all_schemas());
@@ -290,8 +305,9 @@ impl LogicalPlan {
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Limit { .. }
             | LogicalPlan::CreateExternalTable { .. }
-            | LogicalPlan::Explain { .. } => vec![],
-            LogicalPlan::Union { .. } => {
+            | LogicalPlan::CrossJoin { .. }
+            | LogicalPlan::Explain { .. }
+            | LogicalPlan::Union { .. } => {
                 vec![]
             }
         }
@@ -307,6 +323,7 @@ impl LogicalPlan {
             LogicalPlan::Aggregate { input, .. } => vec![input],
             LogicalPlan::Sort { input, .. } => vec![input],
             LogicalPlan::Join { left, right, .. } => vec![left, right],
+            LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
             LogicalPlan::Limit { input, .. } => vec![input],
             LogicalPlan::Extension { node } => node.inputs(),
             LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
@@ -396,7 +413,8 @@ impl LogicalPlan {
             LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
             LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
             LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
-            LogicalPlan::Join { left, right, .. } => {
+            LogicalPlan::Join { left, right, .. }
+            | LogicalPlan::CrossJoin { left, right, .. } => {
                 left.accept(visitor)? && right.accept(visitor)?
             }
             LogicalPlan::Union { inputs, .. } => {
@@ -669,6 +687,9 @@ impl LogicalPlan {
                             keys.iter().map(|(l, r)| format!("{} = {}", l, 
r)).collect();
                         write!(f, "Join: {}", join_expr.join(", "))
                     }
+                    LogicalPlan::CrossJoin { .. } => {
+                        write!(f, "CrossJoin:")
+                    }
                     LogicalPlan::Repartition {
                         partitioning_scheme,
                         ..
diff --git a/datafusion/src/optimizer/constant_folding.rs 
b/datafusion/src/optimizer/constant_folding.rs
index d63177b..71c84f6 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Explain { .. }
             | LogicalPlan::Limit { .. }
             | LogicalPlan::Union { .. }
-            | LogicalPlan::Join { .. } => {
+            | LogicalPlan::Join { .. }
+            | LogicalPlan::CrossJoin { .. } => {
                 // apply the optimization to all inputs of the plan
                 let inputs = plan.inputs();
                 let new_inputs = inputs
diff --git a/datafusion/src/optimizer/filter_push_down.rs 
b/datafusion/src/optimizer/filter_push_down.rs
index ec260a4..4622e9f 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -314,7 +314,8 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
                 .collect::<HashSet<_>>();
             issue_filters(state, used_columns, plan)
         }
-        LogicalPlan::Join { left, right, .. } => {
+        LogicalPlan::Join { left, right, .. }
+        | LogicalPlan::CrossJoin { left, right, .. } => {
             let (pushable_to_left, pushable_to_right, keep) =
                 get_join_predicates(&state, &left.schema(), &right.schema());
 
diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs 
b/datafusion/src/optimizer/hash_build_probe_order.rs
index f44050f..086e2f0 100644
--- a/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> 
{
             // we cannot predict the cardinality of the join output
             None
         }
+        LogicalPlan::CrossJoin { left, right, .. } => {
+            // number of rows is equal to num_left * num_right
+            get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y))
+        }
         LogicalPlan::Repartition { .. } => {
             // we cannot predict how rows will be repartitioned
             None
@@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder {
                     })
                 }
             }
+            LogicalPlan::CrossJoin {
+                left,
+                right,
+                schema,
+            } => {
+                let left = self.optimize(left)?;
+                let right = self.optimize(right)?;
+                if should_swap_join_order(&left, &right) {
+                    // Swap left and right
+                    Ok(LogicalPlan::CrossJoin {
+                        left: Arc::new(right),
+                        right: Arc::new(left),
+                        schema: schema.clone(),
+                    })
+                } else {
+                    // Keep join as is
+                    Ok(LogicalPlan::CrossJoin {
+                        left: Arc::new(left),
+                        right: Arc::new(right),
+                        schema: schema.clone(),
+                    })
+                }
+            }
             // Rest: recurse into plan, apply optimization where possible
             LogicalPlan::Projection { .. }
             | LogicalPlan::Aggregate { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs 
b/datafusion/src/optimizer/projection_push_down.rs
index 6b1cdfe..7243fa5 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -270,6 +270,7 @@ fn optimize_plan(
         | LogicalPlan::Sort { .. }
         | LogicalPlan::CreateExternalTable { .. }
         | LogicalPlan::Union { .. }
+        | LogicalPlan::CrossJoin { .. }
         | LogicalPlan::Extension { .. } => {
             let expr = plan.expressions();
             // collect all required columns by this plan
diff --git a/datafusion/src/optimizer/utils.rs 
b/datafusion/src/optimizer/utils.rs
index fe1d023..0ec3fa7 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -208,6 +208,11 @@ pub fn from_plan(
             on: on.clone(),
             schema: schema.clone(),
         }),
+        LogicalPlan::CrossJoin { schema, .. } => Ok(LogicalPlan::CrossJoin {
+            left: Arc::new(inputs[0].clone()),
+            right: Arc::new(inputs[1].clone()),
+            schema: schema.clone(),
+        }),
         LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
             n: *n,
             input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/src/physical_plan/cross_join.rs 
b/datafusion/src/physical_plan/cross_join.rs
new file mode 100644
index 0000000..4372352
--- /dev/null
+++ b/datafusion/src/physical_plan/cross_join.rs
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the cross join plan for loading the left side of the cross join
+//! and producing batches in parallel for the right partitions
+
+use futures::{lock::Mutex, StreamExt};
+use std::{any::Any, sync::Arc, task::Poll};
+
+use crate::physical_plan::memory::MemoryStream;
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+
+use futures::{Stream, TryStreamExt};
+
+use super::{hash_utils::check_join_is_valid, merge::MergeExec};
+use crate::{
+    error::{DataFusionError, Result},
+    scalar::ScalarValue,
+};
+use async_trait::async_trait;
+use std::time::Instant;
+
+use super::{ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream};
+use crate::physical_plan::coalesce_batches::concat_batches;
+use log::debug;
+
+/// Data of the left side
+type JoinLeftData = RecordBatch;
+
+/// executes partitions in parallel and combines them into a set of
+/// partitions by combining all values from the left with all values on the 
right
+#[derive(Debug)]
+pub struct CrossJoinExec {
+    /// left (build) side which gets loaded in memory
+    left: Arc<dyn ExecutionPlan>,
+    /// right (probe) side which are combined with left side
+    right: Arc<dyn ExecutionPlan>,
+    /// The schema once the join is applied
+    schema: SchemaRef,
+    /// Build-side data
+    build_side: Arc<Mutex<Option<JoinLeftData>>>,
+}
+
+impl CrossJoinExec {
+    /// Tries to create a new [CrossJoinExec].
+    /// # Error
+    /// This function errors when left and right schema's can't be combined
+    pub fn try_new(
+        left: Arc<dyn ExecutionPlan>,
+        right: Arc<dyn ExecutionPlan>,
+    ) -> Result<Self> {
+        let left_schema = left.schema();
+        let right_schema = right.schema();
+        check_join_is_valid(&left_schema, &right_schema, &[])?;
+
+        let left_schema = left.schema();
+        let left_fields = left_schema.fields().iter();
+        let right_schema = right.schema();
+
+        let right_fields = right_schema.fields().iter();
+
+        // left then right
+        let all_columns = left_fields.chain(right_fields).cloned().collect();
+
+        let schema = Arc::new(Schema::new(all_columns));
+
+        Ok(CrossJoinExec {
+            left,
+            right,
+            schema,
+            build_side: Arc::new(Mutex::new(None)),
+        })
+    }
+
+    /// left (build) side which gets loaded in memory
+    pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.left
+    }
+
+    /// right side which gets combined with left side
+    pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.right
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CrossJoinExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.left.clone(), self.right.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            2 => Ok(Arc::new(CrossJoinExec::try_new(
+                children[0].clone(),
+                children[1].clone(),
+            )?)),
+            _ => Err(DataFusionError::Internal(
+                "CrossJoinExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        self.right.output_partitioning()
+    }
+
+    async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
+        // we only want to compute the build side once
+        let left_data = {
+            let mut build_side = self.build_side.lock().await;
+
+            match build_side.as_ref() {
+                Some(stream) => stream.clone(),
+                None => {
+                    let start = Instant::now();
+
+                    // merge all left parts into a single stream
+                    let merge = MergeExec::new(self.left.clone());
+                    let stream = merge.execute(0).await?;
+
+                    // Load all batches and count the rows
+                    let (batches, num_rows) = stream
+                        .try_fold((Vec::new(), 0usize), |mut acc, batch| async 
{
+                            acc.1 += batch.num_rows();
+                            acc.0.push(batch);
+                            Ok(acc)
+                        })
+                        .await?;
+                    let merged_batch =
+                        concat_batches(&self.left.schema(), &batches, 
num_rows)?;
+                    *build_side = Some(merged_batch.clone());
+
+                    debug!(
+                        "Built build-side of cross join containing {} rows in 
{} ms",
+                        num_rows,
+                        start.elapsed().as_millis()
+                    );
+
+                    merged_batch
+                }
+            }
+        };
+
+        let stream = self.right.execute(partition).await?;
+
+        if left_data.num_rows() == 0 {
+            return Ok(Box::pin(MemoryStream::try_new(
+                vec![],
+                self.schema.clone(),
+                None,
+            )?));
+        }
+
+        Ok(Box::pin(CrossJoinStream {
+            schema: self.schema.clone(),
+            left_data,
+            right: stream,
+            right_batch: Arc::new(std::sync::Mutex::new(None)),
+            left_index: 0,
+            num_input_batches: 0,
+            num_input_rows: 0,
+            num_output_batches: 0,
+            num_output_rows: 0,
+            join_time: 0,
+        }))
+    }
+}
+
+/// A stream that issues [RecordBatch]es as they arrive from the right  of the 
join.
+struct CrossJoinStream {
+    /// Input schema
+    schema: Arc<Schema>,
+    /// data from the left side
+    left_data: JoinLeftData,
+    /// right
+    right: SendableRecordBatchStream,
+    /// Current value on the left
+    left_index: usize,
+    /// Current batch being processed from the right side
+    right_batch: Arc<std::sync::Mutex<Option<RecordBatch>>>,
+    /// number of input batches
+    num_input_batches: usize,
+    /// number of input rows
+    num_input_rows: usize,
+    /// number of batches produced
+    num_output_batches: usize,
+    /// number of rows produced
+    num_output_rows: usize,
+    /// total time for joining probe-side batches to the build-side batches
+    join_time: usize,
+}
+
+impl RecordBatchStream for CrossJoinStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+fn build_batch(
+    left_index: usize,
+    batch: &RecordBatch,
+    left_data: &RecordBatch,
+    schema: &Schema,
+) -> ArrowResult<RecordBatch> {
+    // Repeat value on the left n times
+    let arrays = left_data
+        .columns()
+        .iter()
+        .map(|arr| {
+            let scalar = ScalarValue::try_from_array(arr, left_index)?;
+            Ok(scalar.to_array_of_size(batch.num_rows()))
+        })
+        .collect::<Result<Vec<_>>>()
+        .map_err(|x| x.into_arrow_external_error())?;
+
+    RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        arrays
+            .iter()
+            .chain(batch.columns().iter())
+            .cloned()
+            .collect(),
+    )
+}
+
+#[async_trait]
+impl Stream for CrossJoinStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+            let start = Instant::now();
+            let right_batch = {
+                let right_batch = self.right_batch.lock().unwrap();
+                right_batch.clone().unwrap()
+            };
+            let result =
+                build_batch(self.left_index, &right_batch, &self.left_data, 
&self.schema);
+            self.num_input_rows += right_batch.num_rows();
+            if let Ok(ref batch) = result {
+                self.join_time += start.elapsed().as_millis() as usize;
+                self.num_output_batches += 1;
+                self.num_output_rows += batch.num_rows();
+            }
+            self.left_index += 1;
+            return Poll::Ready(Some(result));
+        }
+        self.left_index = 0;
+        self.right
+            .poll_next_unpin(cx)
+            .map(|maybe_batch| match maybe_batch {
+                Some(Ok(batch)) => {
+                    let start = Instant::now();
+                    let result = build_batch(
+                        self.left_index,
+                        &batch,
+                        &self.left_data,
+                        &self.schema,
+                    );
+                    self.num_input_batches += 1;
+                    self.num_input_rows += batch.num_rows();
+                    if let Ok(ref batch) = result {
+                        self.join_time += start.elapsed().as_millis() as usize;
+                        self.num_output_batches += 1;
+                        self.num_output_rows += batch.num_rows();
+                    }
+                    self.left_index = 1;
+
+                    let mut right_batch = self.right_batch.lock().unwrap();
+                    *right_batch = Some(batch);
+
+                    Some(result)
+                }
+                other => {
+                    debug!(
+                        "Processed {} probe-side input batches containing {} 
rows and \
+                        produced {} output batches containing {} rows in {} 
ms",
+                        self.num_input_batches,
+                        self.num_input_rows,
+                        self.num_output_batches,
+                        self.num_output_rows,
+                        self.join_time
+                    );
+                    other
+                }
+            })
+    }
+}
diff --git a/datafusion/src/physical_plan/hash_utils.rs 
b/datafusion/src/physical_plan/hash_utils.rs
index b26ff9b..a38cc09 100644
--- a/datafusion/src/physical_plan/hash_utils.rs
+++ b/datafusion/src/physical_plan/hash_utils.rs
@@ -52,11 +52,6 @@ fn check_join_set_is_valid(
     right: &HashSet<String>,
     on: &JoinOn,
 ) -> Result<()> {
-    if on.is_empty() {
-        return Err(DataFusionError::Plan(
-            "The 'on' clause of a join cannot be empty".to_string(),
-        ));
-    }
     let on_left = &on.iter().map(|on| 
on.0.to_string()).collect::<HashSet<_>>();
     let left_missing = on_left.difference(left).collect::<HashSet<_>>();
 
diff --git a/datafusion/src/physical_plan/memory.rs 
b/datafusion/src/physical_plan/memory.rs
index bef9bcc..9022077 100644
--- a/datafusion/src/physical_plan/memory.rs
+++ b/datafusion/src/physical_plan/memory.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading in-memory batches of data
 
+use core::fmt;
 use std::any::Any;
 use std::sync::Arc;
 use std::task::{Context, Poll};
@@ -31,7 +32,6 @@ use async_trait::async_trait;
 use futures::Stream;
 
 /// Execution plan for reading in-memory batches of data
-#[derive(Debug)]
 pub struct MemoryExec {
     /// The partitions to query
     partitions: Vec<Vec<RecordBatch>>,
@@ -41,6 +41,14 @@ pub struct MemoryExec {
     projection: Option<Vec<usize>>,
 }
 
+impl fmt::Debug for MemoryExec {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "partitions: [...]")?;
+        write!(f, "schema: {:?}", self.schema)?;
+        write!(f, "projection: {:?}", self.projection)
+    }
+}
+
 #[async_trait]
 impl ExecutionPlan for MemoryExec {
     /// Return a reference to Any that can be used for downcasting
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 80dfe6e..11f0946 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -345,6 +345,7 @@ pub mod aggregates;
 pub mod array_expressions;
 pub mod coalesce_batches;
 pub mod common;
+pub mod cross_join;
 #[cfg(feature = "crypto_expressions")]
 pub mod crypto_expressions;
 pub mod csv;
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index f9279ae..ae6ad50 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -20,8 +20,8 @@
 use std::sync::Arc;
 
 use super::{
-    aggregates, empty::EmptyExec, expressions::binary, functions,
-    hash_join::PartitionMode, udaf, union::UnionExec,
+    aggregates, cross_join::CrossJoinExec, empty::EmptyExec, 
expressions::binary,
+    functions, hash_join::PartitionMode, udaf, union::UnionExec,
 };
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::ExecutionContextState;
@@ -328,6 +328,11 @@ impl DefaultPhysicalPlanner {
                     )?))
                 }
             }
+            LogicalPlan::CrossJoin { left, right, .. } => {
+                let left = self.create_initial_plan(left, ctx_state)?;
+                let right = self.create_initial_plan(right, ctx_state)?;
+                Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
+            }
             LogicalPlan::EmptyRelation {
                 produce_one_row,
                 schema,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index f3cba23..a40d0be 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -355,12 +355,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             JoinOperator::Inner(constraint) => {
                 self.parse_join(left, &right, constraint, JoinType::Inner)
             }
+            JoinOperator::CrossJoin => self.parse_cross_join(left, &right),
             other => Err(DataFusionError::NotImplemented(format!(
                 "Unsupported JOIN operator {:?}",
                 other
             ))),
         }
     }
+    fn parse_cross_join(
+        &self,
+        left: &LogicalPlan,
+        right: &LogicalPlan,
+    ) -> Result<LogicalPlan> {
+        LogicalPlanBuilder::from(&left).cross_join(&right)?.build()
+    }
 
     fn parse_join(
         &self,
@@ -489,9 +497,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         }
                     }
                     if join_keys.is_empty() {
-                        return Err(DataFusionError::NotImplemented(
-                            "Cartesian joins are not supported".to_string(),
-                        ));
+                        left =
+                            
LogicalPlanBuilder::from(&left).cross_join(right)?.build()?;
                     } else {
                         let left_keys: Vec<_> =
                             join_keys.iter().map(|(l, _)| *l).collect();
@@ -517,9 +524,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 if plans.len() == 1 {
                     Ok(plans[0].clone())
                 } else {
-                    Err(DataFusionError::NotImplemented(
-                        "Cartesian joins are not supported".to_string(),
-                    ))
+                    let mut left = plans[0].clone();
+                    for right in plans.iter().skip(1) {
+                        left =
+                            
LogicalPlanBuilder::from(&left).cross_join(right)?.build()?;
+                    }
+                    Ok(left)
                 }
             }
         };
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index f4d4e65..70baffc 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1289,15 +1289,57 @@ async fn equijoin_implicit_syntax_reversed() -> 
Result<()> {
 }
 
 #[tokio::test]
-async fn cartesian_join() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id")?;
+async fn cross_join() {
+    let mut ctx = create_join_context("t1_id", "t2_id").unwrap();
+
     let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id";
-    let maybe_plan = ctx.create_logical_plan(&sql);
+    let actual = execute(&mut ctx, sql).await;
+
+    assert_eq!(4 * 4, actual.len());
+
+    let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY 
t1_id";
+    let actual = execute(&mut ctx, sql).await;
+
+    assert_eq!(4 * 4, actual.len());
+
+    let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2";
+    let actual = execute(&mut ctx, sql).await;
+
+    assert_eq!(4 * 4, actual.len());
+
     assert_eq!(
-        "This feature is not implemented: Cartesian joins are not supported",
-        &format!("{}", maybe_plan.err().unwrap())
+        actual,
+        [
+            ["11", "a", "z"],
+            ["11", "a", "y"],
+            ["11", "a", "x"],
+            ["11", "a", "w"],
+            ["22", "b", "z"],
+            ["22", "b", "y"],
+            ["22", "b", "x"],
+            ["22", "b", "w"],
+            ["33", "c", "z"],
+            ["33", "c", "y"],
+            ["33", "c", "x"],
+            ["33", "c", "w"],
+            ["44", "d", "z"],
+            ["44", "d", "y"],
+            ["44", "d", "x"],
+            ["44", "d", "w"]
+        ]
     );
-    Ok(())
+
+    // Two partitions (from UNION) on the left
+    let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT 
t1_id, t1_name FROM t1) t1 CROSS JOIN t2";
+    let actual = execute(&mut ctx, sql).await;
+
+    assert_eq!(4 * 4 * 2, actual.len());
+
+    // Two partitions (from UNION) on the right
+    let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT 
t2_name FROM t2 UNION ALL SELECT t2_name FROM t2)";
+    let actual = execute(&mut ctx, sql).await;
+
+    assert_eq!(4 * 4 * 2, actual.len());
 }
 
 fn create_join_context(

Reply via email to