alamb commented on code in PR #13823: URL: https://github.com/apache/datafusion/pull/13823#discussion_r1890158218
########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -488,95 +495,152 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { self.properties().output_partitioning() } - fn execution_mode(&self) -> ExecutionMode { - self.properties().execution_mode() - } - fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } + fn boundedness(&self) -> Boundedness { + self.properties().boundedness + } + + fn pipeline_behavior(&self) -> EmissionType { + self.properties().emission_type + } + fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } } -/// Describes the execution mode of the result of calling -/// [`ExecutionPlan::execute`] with respect to its size and behavior. +/// Represents whether a stream of data **generated** by an operator is bounded (finite) +/// or unbounded (infinite). /// -/// The mode of the execution plan is determined by the mode of its input -/// execution plans and the details of the operator itself. For example, a -/// `FilterExec` operator will have the same execution mode as its input, but a -/// `SortExec` operator may have a different execution mode than its input, -/// depending on how the input stream is sorted. +/// This is used to determine whether an execution plan will eventually complete +/// processing all its data (bounded) or could potentially run forever (unbounded). /// -/// There are three possible execution modes: `Bounded`, `Unbounded` and -/// `PipelineBreaking`. -#[derive(Clone, Copy, PartialEq, Debug)] -pub enum ExecutionMode { - /// The stream is bounded / finite. - /// - /// In this case the stream will eventually return `None` to indicate that - /// there are no more records to process. +/// For unbounded streams, it also tracks whether the operator requires finite memory +/// to process the stream or if memory usage could grow unbounded. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Boundedness { + /// The data stream is bounded (finite) and will eventually complete Bounded, - /// The stream is unbounded / infinite. - /// - /// In this case, the stream will never be done (never return `None`), - /// except in case of error. - /// - /// This mode is often used in "Steaming" use cases where data is - /// incrementally processed as it arrives. - /// - /// Note that even though the operator generates an unbounded stream of - /// results, it can execute with bounded memory and incrementally produces - /// output. - Unbounded, - /// Some of the operator's input stream(s) are unbounded, but the operator - /// cannot generate streaming results from these streaming inputs. - /// - /// In this case, the execution mode will be pipeline breaking, e.g. the - /// operator requires unbounded memory to generate results. This - /// information is used by the planner when performing sanity checks - /// on plans processings unbounded data sources. - PipelineBreaking, + /// The data stream is unbounded (infinite) and could run forever + Unbounded { + /// Whether this operator requires infinite memory to process the unbounded stream. + /// If false, the operator can process an infinite stream with bounded memory. Review Comment: Maybe we can give an example of each type? Like is it the case that ```rust Unbounded { requires_infinite_memory: true, } ``` Would describe a min/max aggregate? ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -488,95 +495,152 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { self.properties().output_partitioning() } - fn execution_mode(&self) -> ExecutionMode { - self.properties().execution_mode() - } - fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } + fn boundedness(&self) -> Boundedness { + self.properties().boundedness + } + + fn pipeline_behavior(&self) -> EmissionType { + self.properties().emission_type + } + fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } } -/// Describes the execution mode of the result of calling -/// [`ExecutionPlan::execute`] with respect to its size and behavior. +/// Represents whether a stream of data **generated** by an operator is bounded (finite) +/// or unbounded (infinite). /// -/// The mode of the execution plan is determined by the mode of its input -/// execution plans and the details of the operator itself. For example, a -/// `FilterExec` operator will have the same execution mode as its input, but a -/// `SortExec` operator may have a different execution mode than its input, -/// depending on how the input stream is sorted. +/// This is used to determine whether an execution plan will eventually complete +/// processing all its data (bounded) or could potentially run forever (unbounded). /// -/// There are three possible execution modes: `Bounded`, `Unbounded` and -/// `PipelineBreaking`. -#[derive(Clone, Copy, PartialEq, Debug)] -pub enum ExecutionMode { - /// The stream is bounded / finite. - /// - /// In this case the stream will eventually return `None` to indicate that - /// there are no more records to process. +/// For unbounded streams, it also tracks whether the operator requires finite memory +/// to process the stream or if memory usage could grow unbounded. Review Comment: Can we clarify in these comments how Boundedness is related to the input stream? For example, if the the input stream is unbounded, does that imply the output is also unbounded? Maybe some examples would help to clarify this ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -488,95 +495,152 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { self.properties().output_partitioning() } - fn execution_mode(&self) -> ExecutionMode { - self.properties().execution_mode() - } - fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } + fn boundedness(&self) -> Boundedness { + self.properties().boundedness + } + + fn pipeline_behavior(&self) -> EmissionType { + self.properties().emission_type + } + fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } } -/// Describes the execution mode of the result of calling -/// [`ExecutionPlan::execute`] with respect to its size and behavior. +/// Represents whether a stream of data **generated** by an operator is bounded (finite) +/// or unbounded (infinite). /// -/// The mode of the execution plan is determined by the mode of its input -/// execution plans and the details of the operator itself. For example, a -/// `FilterExec` operator will have the same execution mode as its input, but a -/// `SortExec` operator may have a different execution mode than its input, -/// depending on how the input stream is sorted. +/// This is used to determine whether an execution plan will eventually complete +/// processing all its data (bounded) or could potentially run forever (unbounded). /// -/// There are three possible execution modes: `Bounded`, `Unbounded` and -/// `PipelineBreaking`. -#[derive(Clone, Copy, PartialEq, Debug)] -pub enum ExecutionMode { - /// The stream is bounded / finite. - /// - /// In this case the stream will eventually return `None` to indicate that - /// there are no more records to process. +/// For unbounded streams, it also tracks whether the operator requires finite memory +/// to process the stream or if memory usage could grow unbounded. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Boundedness { + /// The data stream is bounded (finite) and will eventually complete Bounded, - /// The stream is unbounded / infinite. - /// - /// In this case, the stream will never be done (never return `None`), - /// except in case of error. - /// - /// This mode is often used in "Steaming" use cases where data is - /// incrementally processed as it arrives. - /// - /// Note that even though the operator generates an unbounded stream of - /// results, it can execute with bounded memory and incrementally produces - /// output. - Unbounded, - /// Some of the operator's input stream(s) are unbounded, but the operator - /// cannot generate streaming results from these streaming inputs. - /// - /// In this case, the execution mode will be pipeline breaking, e.g. the - /// operator requires unbounded memory to generate results. This - /// information is used by the planner when performing sanity checks - /// on plans processings unbounded data sources. - PipelineBreaking, + /// The data stream is unbounded (infinite) and could run forever + Unbounded { + /// Whether this operator requires infinite memory to process the unbounded stream. + /// If false, the operator can process an infinite stream with bounded memory. + /// If true, memory usage may grow unbounded while processing the stream. + requires_infinite_memory: bool, + }, } -impl ExecutionMode { - /// Check whether the execution mode is unbounded or not. +impl Boundedness { pub fn is_unbounded(&self) -> bool { - matches!(self, ExecutionMode::Unbounded) + matches!(self, Boundedness::Unbounded { .. }) } +} - /// Check whether the execution is pipeline friendly. If so, operator can - /// execute safely. - pub fn pipeline_friendly(&self) -> bool { - matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded) - } +/// Represents how an operator emits its output records. +/// +/// This is used to determine whether an operator emits records incrementally as they arrive, Review Comment: I suggest documenting when someone would expect incremental output -- I think it is basically when the operator can emit a RecordBatches of `batch_size` rows Specifically I think it would help to say something like "EmissionType::Incremental generates output incrementally but the operator may still buffer data internally until `batch_size` records arrive ########## datafusion/ffi/src/plan_properties.rs: ########## @@ -220,50 +235,89 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties { RErr(e) => Err(DataFusionError::Plan(e.to_string())), }?; - let execution_mode: ExecutionMode = - unsafe { (ffi_props.execution_mode)(&ffi_props).into() }; - let eq_properties = match orderings { Some(ordering) => { EquivalenceProperties::new_with_orderings(Arc::new(schema), &[ordering]) } None => EquivalenceProperties::new(Arc::new(schema)), }; + let emission_type: EmissionType = + unsafe { (ffi_props.emission_type)(&ffi_props).into() }; + + let boundedness: Boundedness = + unsafe { (ffi_props.boundedness)(&ffi_props).into() }; + Ok(PlanProperties::new( eq_properties, partitioning, - execution_mode, + emission_type, + boundedness, )) } } -/// FFI safe version of [`ExecutionMode`]. +/// FFI safe version of [`Boundedness`]. #[repr(C)] #[allow(non_camel_case_types)] #[derive(Clone, StableAbi)] -pub enum FFI_ExecutionMode { +pub enum FFI_Boundedness { Bounded, - Unbounded, Review Comment: I think so too -- Maybe we can leave the `FFI_ExecutionMode` present and mark it deprecated or something 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org