crepererum commented on code in PR #6045:
URL: https://github.com/apache/arrow-datafusion/pull/6045#discussion_r1172240150


##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -340,8 +278,222 @@ impl ExecutionPlan for UnionExec {
     }
 }
 
+/// Combines multiple input streams by interleaving them.
+///
+/// This only works if all inputs have the same hash-partitioning.
+///
+/// # Data Flow
+/// ```text
+/// +---------+
+/// |         |---+
+/// | Input 1 |   |
+/// |         |-------------+
+/// +---------+   |         |     
+///               |         |         +---------+
+///               +------------------>|         |
+///                 +---------------->| Combine |-->
+///                 | +-------------->|         |
+///                 | |     |         +---------+
+/// +---------+     | |     |       
+/// |         |-----+ |     |
+/// | Input 2 |       |     |
+/// |         |---------------+
+/// +---------+       |     | |    
+///                   |     | |       +---------+
+///                   |     +-------->|         |
+///                   |       +------>| Combine |-->
+///                   |         +---->|         |
+///                   |         |     +---------+
+/// +---------+       |         |     
+/// |         |-------+         |
+/// | Input 3 |                 |
+/// |         |-----------------+
+/// +---------+
+/// ```
+#[derive(Debug)]
+pub struct InterleaveExec {
+    /// Input execution plan
+    inputs: Vec<Arc<dyn ExecutionPlan>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// Schema of Interleave
+    schema: SchemaRef,
+}
+
+impl InterleaveExec {
+    /// Create a new InterleaveExec
+    pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
+        let schema = union_schema(&inputs);
+
+        if !can_interleave(&inputs) {
+            return Err(DataFusionError::Internal(String::from(
+                "Not all InterleaveExec children have a consistent hash 
partitioning",
+            )));
+        }
+
+        Ok(InterleaveExec {
+            inputs,
+            metrics: ExecutionPlanMetricsSet::new(),
+            schema,
+        })
+    }
+
+    /// Get inputs of the execution plan
+    pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
+        &self.inputs
+    }
+}
+
+impl ExecutionPlan for InterleaveExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children.iter().any(|x| *x))
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        self.inputs.clone()
+    }
+
+    /// Output of the interleave is the combination of all output partitions 
of the inputs

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to