This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22623-249b599acfcd246c1734ea297b3b3901638d53fc in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 18e7c8ed2b18c23750286ba914fe4a1e4d199719 Author: kosiew <[email protected]> AuthorDate: Fri Jun 5 01:32:05 2026 +0800 Refactor hash join build-report lifecycle into `BuildReportHandle` (#22623) ## Which issue does this PR close? * Closes #22622. ## Rationale for this change The build-report lifecycle for hash-join partitions was previously spread across `HashJoinStream`, `OnceFut` polling, and drop-time cancellation logic. Although correctness around scheduled versus delivered reports had already been addressed, the lifecycle ownership remained fragmented and difficult to reason about. This change centralizes lifecycle management into a dedicated abstraction, making state transitions explicit and ensuring drop-time behavior is self-contained and easier to maintain. ## What changes are included in this PR? * Introduce a new `BuildReportHandle` type that owns the lifecycle of a partition's build-data report. * Consolidate report lifecycle state management into explicit states: * `NotReported` * `Scheduled` * `Delivered` * `Canceled` * `Finalized` * Move report scheduling, delivery tracking, cancellation, and finalization logic out of `HashJoinStream` and into `BuildReportHandle`. * Implement drop-safe behavior in `BuildReportHandle` so pending scheduled reports are canceled when dropped before delivery. * Replace the stream's separate accumulator, waiter, and lifecycle state fields with a single `build_report` handle. * Add test-only helpers in `shared_bounds.rs` to construct partitioned accumulators and inspect completed partition counts. * Update lifecycle documentation to reflect the new centralized ownership model and state transitions. ## Are these changes tested? Yes. New tests were added covering lifecycle behavior and invariants: * `report_canceled_partition_is_noop_after_report` * `report_canceled_partition_marks_pending_partition_canceled` * `build_report_handle_cancels_scheduled_partition_on_drop` * `build_report_handle_does_not_cancel_delivered_partition_on_drop` * `build_report_handle_cancel_pending_is_idempotent` * `build_report_handle_no_accumulator_finalizes` These tests verify correct cancellation behavior, delivery tracking, terminal-state handling, and idempotency. ## Are there any user-facing changes? No. This is an internal refactoring of hash-join build-report lifecycle management and does not change user-facing behavior. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. --- .../src/joins/hash_join/shared_bounds.rs | 66 ++++--- .../physical-plan/src/joins/hash_join/stream.rs | 210 +++++++++++++++++---- 2 files changed, 216 insertions(+), 60 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index fba6b2c2db..0af4015ff7 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -699,33 +699,49 @@ impl fmt::Debug for SharedBuildAccumulator { } } +#[cfg(test)] +pub(super) fn make_partitioned_accumulator_for_test( + num_partitions: usize, +) -> SharedBuildAccumulator { + let probe_schema = Arc::new(Schema::new(vec![Field::new( + "probe_key", + DataType::Int32, + false, + )])); + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); + SharedBuildAccumulator { + inner: Mutex::new(AccumulatorState { + data: AccumulatedBuildData::Partitioned { + partitions: vec![PartitionStatus::Pending; num_partitions], + completed_partitions: 0, + }, + completion: CompletionState::Pending, + }), + completion_notify: Notify::new(), + dynamic_filter, + on_right: vec![], + repartition_random_state: SeededRandomState::with_seed(1), + probe_schema, + } +} + +#[cfg(test)] +pub(super) fn completed_partitions_for_test(acc: &SharedBuildAccumulator) -> usize { + let guard = acc.inner.lock(); + let AccumulatedBuildData::Partitioned { + completed_partitions, + .. + } = &guard.data + else { + panic!("expected partitioned accumulator"); + }; + *completed_partitions +} + #[cfg(test)] mod tests { use super::*; - fn make_partitioned_accumulator(num_partitions: usize) -> SharedBuildAccumulator { - let probe_schema = Arc::new(Schema::new(vec![Field::new( - "probe_key", - DataType::Int32, - false, - )])); - let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); - SharedBuildAccumulator { - inner: Mutex::new(AccumulatorState { - data: AccumulatedBuildData::Partitioned { - partitions: vec![PartitionStatus::Pending; num_partitions], - completed_partitions: 0, - }, - completion: CompletionState::Pending, - }), - completion_notify: Notify::new(), - dynamic_filter, - on_right: vec![], - repartition_random_state: SeededRandomState::with_seed(1), - probe_schema, - } - } - fn partitioned_state(acc: &SharedBuildAccumulator) -> (Vec<PartitionStatus>, usize) { let guard = acc.inner.lock(); let AccumulatedBuildData::Partitioned { @@ -748,7 +764,7 @@ mod tests { // `Reported`. This test pins that invariant. #[test] fn report_canceled_partition_is_noop_after_report() { - let acc = make_partitioned_accumulator(2); + let acc = make_partitioned_accumulator_for_test(2); { let mut guard = acc.inner.lock(); @@ -780,7 +796,7 @@ mod tests { // which is what unblocks sibling partitions waiting on the coordinator. #[test] fn report_canceled_partition_marks_pending_partition_canceled() { - let acc = make_partitioned_accumulator(2); + let acc = make_partitioned_accumulator_for_test(2); acc.report_canceled_partition(0); let (partitions, completed) = partitioned_state(&acc); diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 040470c9be..d403fa43cd 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -173,15 +173,109 @@ impl ProcessProbeBatchState { /// Lifecycle of this partition's build-data report to the shared coordinator. /// -/// `ReportScheduled` means the reporting `OnceFut` has been constructed but is -/// lazy: the coordinator has not yet observed the report. Only `ReportDelivered` -/// guarantees the coordinator saw it, so `Drop` must still cancel the partition -/// when the state is `ReportScheduled` — otherwise sibling partitions wait -/// forever for a report that never runs. +/// `Scheduled` means the reporting `OnceFut` has been constructed but is lazy: +/// the coordinator has not necessarily observed the report. Only `Delivered` +/// guarantees the coordinator saw it, so `Drop` must still cancel a `Scheduled` +/// partition — otherwise sibling partitions can wait forever for a report that +/// never runs. +#[derive(Debug, PartialEq, Eq)] enum BuildReportState { NotReported, - ReportScheduled, - ReportDelivered, + Scheduled, + Delivered, + Canceled, + Finalized, +} + +/// Owns the stream-side lifecycle for one partition's build-data report. +struct BuildReportHandle { + partition: usize, + mode: PartitionMode, + build_accumulator: Option<Arc<SharedBuildAccumulator>>, + waiter: Option<OnceFut<()>>, + state: BuildReportState, +} + +impl BuildReportHandle { + fn new( + partition: usize, + mode: PartitionMode, + build_accumulator: Option<Arc<SharedBuildAccumulator>>, + ) -> Self { + Self { + partition, + mode, + build_accumulator, + waiter: None, + state: BuildReportState::NotReported, + } + } + + fn has_accumulator(&self) -> bool { + self.build_accumulator.is_some() + } + + fn schedule(&mut self, build_data: PartitionBuildData) { + let Some(build_accumulator) = &self.build_accumulator else { + // Defensive no-op terminal state; current callers avoid scheduling + // unless an accumulator is present. + self.finalize(); + return; + }; + + debug_assert!(matches!(self.state, BuildReportState::NotReported)); + let acc = Arc::clone(build_accumulator); + self.waiter = Some(OnceFut::new(async move { + acc.report_build_data(build_data).await + })); + self.state = BuildReportState::Scheduled; + } + + fn poll_delivery(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<()>> { + if let Some(ref mut fut) = self.waiter { + ready!(fut.get_shared(cx))?; + if !matches!(self.state, BuildReportState::Delivered) { + debug_assert!(matches!(self.state, BuildReportState::Scheduled)); + self.state = BuildReportState::Delivered; + } + } + Poll::Ready(Ok(())) + } + + fn cancel_pending(&mut self) { + if matches!( + self.state, + BuildReportState::Delivered + | BuildReportState::Canceled + | BuildReportState::Finalized + ) { + return; + } + + if self.mode == PartitionMode::Partitioned + && let Some(build_accumulator) = &self.build_accumulator + { + build_accumulator.report_canceled_partition(self.partition); + self.state = BuildReportState::Canceled; + } else { + self.finalize(); + } + } + + fn finalize(&mut self) { + self.state = BuildReportState::Finalized; + } + + #[cfg(test)] + fn state(&self) -> &BuildReportState { + &self.state + } +} + +impl Drop for BuildReportHandle { + fn drop(&mut self) { + self.cancel_pending(); + } } /// [`Stream`] for [`super::HashJoinExec`] that does the actual join. @@ -228,13 +322,8 @@ pub(super) struct HashJoinStream { build_indices_buffer: Vec<u64>, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, - /// Shared build accumulator for coordinating dynamic filter updates (collects hash maps and/or bounds, optional) - build_accumulator: Option<Arc<SharedBuildAccumulator>>, - /// Optional future to signal when build information has been reported by all partitions - /// and the dynamic filter has been updated - build_waiter: Option<OnceFut<()>>, - /// Tracks where this partition is in the build-data reporting lifecycle. - build_report_state: BuildReportState, + /// Owns this partition's build-data report lifecycle. + build_report: BuildReportHandle, /// Partitioning mode to use mode: PartitionMode, /// Output buffer for coalescing small batches into larger ones with optional fetch limit. @@ -414,9 +503,7 @@ impl HashJoinStream { probe_indices_buffer: Vec::with_capacity(batch_size), build_indices_buffer: Vec::with_capacity(batch_size), right_side_ordered, - build_accumulator, - build_waiter: None, - build_report_state: BuildReportState::NotReported, + build_report: BuildReportHandle::new(partition, mode, build_accumulator), mode, output_buffer, null_aware, @@ -449,9 +536,9 @@ impl HashJoinStream { &mut self, left_data: &Arc<JoinLeftData>, ) -> HashJoinStreamState { - let Some(build_accumulator) = self.build_accumulator.as_ref() else { + if !self.build_report.has_accumulator() { return Self::state_after_build_ready(self.join_type, left_data.as_ref()); - }; + } let pushdown = left_data.membership().clone(); let bounds = left_data @@ -473,11 +560,7 @@ impl HashJoinStream { ), }; - let acc = Arc::clone(build_accumulator); - self.build_waiter = Some(OnceFut::new(async move { - acc.report_build_data(build_data).await - })); - self.build_report_state = BuildReportState::ReportScheduled; + self.build_report.schedule(build_data); HashJoinStreamState::WaitPartitionBoundsReport } @@ -541,10 +624,7 @@ impl HashJoinStream { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { - if let Some(ref mut fut) = self.build_waiter { - ready!(fut.get_shared(cx))?; - self.build_report_state = BuildReportState::ReportDelivered; - } + ready!(self.build_report.poll_delivery(cx))?; let build_side = self.build_side.try_as_ready()?; self.state = Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); @@ -966,14 +1046,74 @@ impl Stream for HashJoinStream { } } -impl Drop for HashJoinStream { - fn drop(&mut self) { - if self.mode == PartitionMode::Partitioned - && !matches!(self.build_report_state, BuildReportState::ReportDelivered) - && let Some(build_accumulator) = &self.build_accumulator +#[cfg(test)] +mod tests { + use super::*; + use crate::joins::hash_join::shared_bounds::{ + PushdownStrategy, completed_partitions_for_test, + make_partitioned_accumulator_for_test, + }; + + fn empty_build_data(partition_id: usize) -> PartitionBuildData { + PartitionBuildData::Partitioned { + partition_id, + pushdown: PushdownStrategy::Empty, + bounds: PartitionBounds::new(vec![]), + } + } + + fn partitioned_handle(acc: &Arc<SharedBuildAccumulator>) -> BuildReportHandle { + BuildReportHandle::new(0, PartitionMode::Partitioned, Some(Arc::clone(acc))) + } + + #[test] + fn build_report_handle_cancels_scheduled_partition_on_drop() { + let acc = Arc::new(make_partitioned_accumulator_for_test(2)); + { - build_accumulator.report_canceled_partition(self.partition); - self.build_report_state = BuildReportState::ReportDelivered; + let mut handle = partitioned_handle(&acc); + handle.schedule(empty_build_data(0)); + assert_eq!(handle.state(), &BuildReportState::Scheduled); } + + assert_eq!(completed_partitions_for_test(&acc), 1); + } + + #[test] + fn build_report_handle_does_not_cancel_delivered_partition_on_drop() { + let acc = Arc::new(make_partitioned_accumulator_for_test(1)); + + { + let mut handle = partitioned_handle(&acc); + handle.schedule(empty_build_data(0)); + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); + assert!(matches!(handle.poll_delivery(&mut cx), Poll::Ready(Ok(())))); + assert_eq!(handle.state(), &BuildReportState::Delivered); + } + + assert_eq!(completed_partitions_for_test(&acc), 1); + } + + #[test] + fn build_report_handle_cancel_pending_is_idempotent() { + let acc = Arc::new(make_partitioned_accumulator_for_test(2)); + let mut handle = partitioned_handle(&acc); + handle.schedule(empty_build_data(0)); + + handle.cancel_pending(); + handle.cancel_pending(); + + assert_eq!(handle.state(), &BuildReportState::Canceled); + assert_eq!(completed_partitions_for_test(&acc), 1); + } + + #[test] + fn build_report_handle_no_accumulator_finalizes() { + let mut handle = BuildReportHandle::new(0, PartitionMode::Partitioned, None); + + handle.schedule(empty_build_data(0)); + handle.cancel_pending(); + + assert_eq!(handle.state(), &BuildReportState::Finalized); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
