mingmwang commented on code in PR #6045:
URL: https://github.com/apache/arrow-datafusion/pull/6045#discussion_r1171060878
##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -340,8 +278,222 @@ impl ExecutionPlan for UnionExec {
}
}
+/// Combines multiple input streams by interleaving them.
+///
+/// This only works if all inputs have the same hash-partitioning.
+///
+/// # Data Flow
+/// ```text
+/// +---------+
+/// | |---+
+/// | Input 1 | |
+/// | |-------------+
+/// +---------+ | |
+/// | | +---------+
+/// +------------------>| |
+/// +---------------->| Combine |-->
+/// | +-------------->| |
+/// | | | +---------+
+/// +---------+ | | |
+/// | |-----+ | |
+/// | Input 2 | | |
+/// | |---------------+
+/// +---------+ | | |
+/// | | | +---------+
+/// | +-------->| |
+/// | +------>| Combine |-->
+/// | +---->| |
+/// | | +---------+
+/// +---------+ | |
+/// | |-------+ |
+/// | Input 3 | |
+/// | |-----------------+
+/// +---------+
+/// ```
+#[derive(Debug)]
+pub struct InterleaveExec {
+ /// Input execution plan
+ inputs: Vec<Arc<dyn ExecutionPlan>>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
+ /// Schema of Union
+ schema: SchemaRef,
+}
+
+impl InterleaveExec {
+ /// Create a new InterleaveExec
+ pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
+ let schema = union_schema(&inputs);
+
+ if !can_interleave(&inputs) {
+ return Err(DataFusionError::Internal(String::from(
+ "Not all InterleaveExec children have a consistent hash
partitioning",
+ )));
+ }
+
+ Ok(InterleaveExec {
+ inputs,
+ metrics: ExecutionPlanMetricsSet::new(),
+ schema,
+ })
+ }
+
+ /// Get inputs of the execution plan
+ pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
+ &self.inputs
+ }
+}
+
+impl ExecutionPlan for InterleaveExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children.iter().any(|x| *x))
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ self.inputs.clone()
+ }
+
+ /// Output of the union is the combination of all output partitions of the
inputs
+ fn output_partitioning(&self) -> Partitioning {
+ self.inputs[0].output_partitioning()
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![false; self.inputs().len()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(InterleaveExec::try_new(children)?))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ debug!("Start InterleaveExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+ // record the tiny amount of work done in this function so
+ // elapsed_compute is reported as non zero
+ let elapsed_compute = baseline_metrics.elapsed_compute().clone();
+ let _timer = elapsed_compute.timer(); // record on drop
+
+ let mut input_stream_vec = vec![];
+ for input in self.inputs.iter() {
+ if partition < input.output_partitioning().partition_count() {
+ input_stream_vec.push(input.execute(partition,
context.clone())?);
+ } else {
+ // Do not find a partition to execute
+ break;
+ }
+ }
+ if input_stream_vec.len() == self.inputs.len() {
+ let stream = Box::pin(CombinedRecordBatchStream::new(
+ self.schema(),
+ input_stream_vec,
+ ));
+ return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+ }
+
+ warn!("Error in InterleaveExec: Partition {} not found", partition);
+
+ Err(crate::error::DataFusionError::Execution(format!(
+ "Partition {partition} not found in Union"
Review Comment:
"Partition {partition} not found in InterleaveExec"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]