ozankabak commented on code in PR #13823: URL: https://github.com/apache/datafusion/pull/13823#discussion_r1890197827
########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -488,95 +495,152 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { self.properties().output_partitioning() } - fn execution_mode(&self) -> ExecutionMode { - self.properties().execution_mode() - } - fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } + fn boundedness(&self) -> Boundedness { + self.properties().boundedness + } + + fn pipeline_behavior(&self) -> EmissionType { + self.properties().emission_type + } + fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } } -/// Describes the execution mode of the result of calling -/// [`ExecutionPlan::execute`] with respect to its size and behavior. +/// Represents whether a stream of data **generated** by an operator is bounded (finite) +/// or unbounded (infinite). /// -/// The mode of the execution plan is determined by the mode of its input -/// execution plans and the details of the operator itself. For example, a -/// `FilterExec` operator will have the same execution mode as its input, but a -/// `SortExec` operator may have a different execution mode than its input, -/// depending on how the input stream is sorted. +/// This is used to determine whether an execution plan will eventually complete +/// processing all its data (bounded) or could potentially run forever (unbounded). /// -/// There are three possible execution modes: `Bounded`, `Unbounded` and -/// `PipelineBreaking`. -#[derive(Clone, Copy, PartialEq, Debug)] -pub enum ExecutionMode { - /// The stream is bounded / finite. - /// - /// In this case the stream will eventually return `None` to indicate that - /// there are no more records to process. +/// For unbounded streams, it also tracks whether the operator requires finite memory +/// to process the stream or if memory usage could grow unbounded. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Boundedness { + /// The data stream is bounded (finite) and will eventually complete Bounded, - /// The stream is unbounded / infinite. - /// - /// In this case, the stream will never be done (never return `None`), - /// except in case of error. - /// - /// This mode is often used in "Steaming" use cases where data is - /// incrementally processed as it arrives. - /// - /// Note that even though the operator generates an unbounded stream of - /// results, it can execute with bounded memory and incrementally produces - /// output. - Unbounded, - /// Some of the operator's input stream(s) are unbounded, but the operator - /// cannot generate streaming results from these streaming inputs. - /// - /// In this case, the execution mode will be pipeline breaking, e.g. the - /// operator requires unbounded memory to generate results. This - /// information is used by the planner when performing sanity checks - /// on plans processings unbounded data sources. - PipelineBreaking, + /// The data stream is unbounded (infinite) and could run forever + Unbounded { + /// Whether this operator requires infinite memory to process the unbounded stream. + /// If false, the operator can process an infinite stream with bounded memory. + /// If true, memory usage may grow unbounded while processing the stream. + requires_infinite_memory: bool, + }, } -impl ExecutionMode { - /// Check whether the execution mode is unbounded or not. +impl Boundedness { pub fn is_unbounded(&self) -> bool { - matches!(self, ExecutionMode::Unbounded) + matches!(self, Boundedness::Unbounded { .. }) } +} - /// Check whether the execution is pipeline friendly. If so, operator can - /// execute safely. - pub fn pipeline_friendly(&self) -> bool { - matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded) - } +/// Represents how an operator emits its output records. +/// +/// This is used to determine whether an operator emits records incrementally as they arrive, Review Comment: Yes, you have the right idea. We will add some comments to clarify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org