This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 18e7c8ed2b Refactor hash join build-report lifecycle into
`BuildReportHandle` (#22623)
18e7c8ed2b is described below
commit 18e7c8ed2b18c23750286ba914fe4a1e4d199719
Author: kosiew <[email protected]>
AuthorDate: Fri Jun 5 01:32:05 2026 +0800
Refactor hash join build-report lifecycle into `BuildReportHandle` (#22623)
## Which issue does this PR close?
* Closes #22622.
## Rationale for this change
The build-report lifecycle for hash-join partitions was previously
spread across `HashJoinStream`, `OnceFut` polling, and drop-time
cancellation logic. Although correctness around scheduled versus
delivered reports had already been addressed, the lifecycle ownership
remained fragmented and difficult to reason about.
This change centralizes lifecycle management into a dedicated
abstraction, making state transitions explicit and ensuring drop-time
behavior is self-contained and easier to maintain.
## What changes are included in this PR?
* Introduce a new `BuildReportHandle` type that owns the lifecycle of a
partition's build-data report.
* Consolidate report lifecycle state management into explicit states:
* `NotReported`
* `Scheduled`
* `Delivered`
* `Canceled`
* `Finalized`
* Move report scheduling, delivery tracking, cancellation, and
finalization logic out of `HashJoinStream` and into `BuildReportHandle`.
* Implement drop-safe behavior in `BuildReportHandle` so pending
scheduled reports are canceled when dropped before delivery.
* Replace the stream's separate accumulator, waiter, and lifecycle state
fields with a single `build_report` handle.
* Add test-only helpers in `shared_bounds.rs` to construct partitioned
accumulators and inspect completed partition counts.
* Update lifecycle documentation to reflect the new centralized
ownership model and state transitions.
## Are these changes tested?
Yes.
New tests were added covering lifecycle behavior and invariants:
* `report_canceled_partition_is_noop_after_report`
* `report_canceled_partition_marks_pending_partition_canceled`
* `build_report_handle_cancels_scheduled_partition_on_drop`
* `build_report_handle_does_not_cancel_delivered_partition_on_drop`
* `build_report_handle_cancel_pending_is_idempotent`
* `build_report_handle_no_accumulator_finalizes`
These tests verify correct cancellation behavior, delivery tracking,
terminal-state handling, and idempotency.
## Are there any user-facing changes?
No. This is an internal refactoring of hash-join build-report lifecycle
management and does not change user-facing behavior.
## LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated
content has been manually reviewed and tested.
---
.../src/joins/hash_join/shared_bounds.rs | 66 ++++---
.../physical-plan/src/joins/hash_join/stream.rs | 210 +++++++++++++++++----
2 files changed, 216 insertions(+), 60 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
index fba6b2c2db..0af4015ff7 100644
--- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
@@ -699,33 +699,49 @@ impl fmt::Debug for SharedBuildAccumulator {
}
}
+#[cfg(test)]
+pub(super) fn make_partitioned_accumulator_for_test(
+ num_partitions: usize,
+) -> SharedBuildAccumulator {
+ let probe_schema = Arc::new(Schema::new(vec![Field::new(
+ "probe_key",
+ DataType::Int32,
+ false,
+ )]));
+ let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![],
lit(true)));
+ SharedBuildAccumulator {
+ inner: Mutex::new(AccumulatorState {
+ data: AccumulatedBuildData::Partitioned {
+ partitions: vec![PartitionStatus::Pending; num_partitions],
+ completed_partitions: 0,
+ },
+ completion: CompletionState::Pending,
+ }),
+ completion_notify: Notify::new(),
+ dynamic_filter,
+ on_right: vec![],
+ repartition_random_state: SeededRandomState::with_seed(1),
+ probe_schema,
+ }
+}
+
+#[cfg(test)]
+pub(super) fn completed_partitions_for_test(acc: &SharedBuildAccumulator) ->
usize {
+ let guard = acc.inner.lock();
+ let AccumulatedBuildData::Partitioned {
+ completed_partitions,
+ ..
+ } = &guard.data
+ else {
+ panic!("expected partitioned accumulator");
+ };
+ *completed_partitions
+}
+
#[cfg(test)]
mod tests {
use super::*;
- fn make_partitioned_accumulator(num_partitions: usize) ->
SharedBuildAccumulator {
- let probe_schema = Arc::new(Schema::new(vec![Field::new(
- "probe_key",
- DataType::Int32,
- false,
- )]));
- let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![],
lit(true)));
- SharedBuildAccumulator {
- inner: Mutex::new(AccumulatorState {
- data: AccumulatedBuildData::Partitioned {
- partitions: vec![PartitionStatus::Pending; num_partitions],
- completed_partitions: 0,
- },
- completion: CompletionState::Pending,
- }),
- completion_notify: Notify::new(),
- dynamic_filter,
- on_right: vec![],
- repartition_random_state: SeededRandomState::with_seed(1),
- probe_schema,
- }
- }
-
fn partitioned_state(acc: &SharedBuildAccumulator) ->
(Vec<PartitionStatus>, usize) {
let guard = acc.inner.lock();
let AccumulatedBuildData::Partitioned {
@@ -748,7 +764,7 @@ mod tests {
// `Reported`. This test pins that invariant.
#[test]
fn report_canceled_partition_is_noop_after_report() {
- let acc = make_partitioned_accumulator(2);
+ let acc = make_partitioned_accumulator_for_test(2);
{
let mut guard = acc.inner.lock();
@@ -780,7 +796,7 @@ mod tests {
// which is what unblocks sibling partitions waiting on the coordinator.
#[test]
fn report_canceled_partition_marks_pending_partition_canceled() {
- let acc = make_partitioned_accumulator(2);
+ let acc = make_partitioned_accumulator_for_test(2);
acc.report_canceled_partition(0);
let (partitions, completed) = partitioned_state(&acc);
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index 040470c9be..d403fa43cd 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -173,15 +173,109 @@ impl ProcessProbeBatchState {
/// Lifecycle of this partition's build-data report to the shared coordinator.
///
-/// `ReportScheduled` means the reporting `OnceFut` has been constructed but is
-/// lazy: the coordinator has not yet observed the report. Only
`ReportDelivered`
-/// guarantees the coordinator saw it, so `Drop` must still cancel the
partition
-/// when the state is `ReportScheduled` — otherwise sibling partitions wait
-/// forever for a report that never runs.
+/// `Scheduled` means the reporting `OnceFut` has been constructed but is lazy:
+/// the coordinator has not necessarily observed the report. Only `Delivered`
+/// guarantees the coordinator saw it, so `Drop` must still cancel a
`Scheduled`
+/// partition — otherwise sibling partitions can wait forever for a report that
+/// never runs.
+#[derive(Debug, PartialEq, Eq)]
enum BuildReportState {
NotReported,
- ReportScheduled,
- ReportDelivered,
+ Scheduled,
+ Delivered,
+ Canceled,
+ Finalized,
+}
+
+/// Owns the stream-side lifecycle for one partition's build-data report.
+struct BuildReportHandle {
+ partition: usize,
+ mode: PartitionMode,
+ build_accumulator: Option<Arc<SharedBuildAccumulator>>,
+ waiter: Option<OnceFut<()>>,
+ state: BuildReportState,
+}
+
+impl BuildReportHandle {
+ fn new(
+ partition: usize,
+ mode: PartitionMode,
+ build_accumulator: Option<Arc<SharedBuildAccumulator>>,
+ ) -> Self {
+ Self {
+ partition,
+ mode,
+ build_accumulator,
+ waiter: None,
+ state: BuildReportState::NotReported,
+ }
+ }
+
+ fn has_accumulator(&self) -> bool {
+ self.build_accumulator.is_some()
+ }
+
+ fn schedule(&mut self, build_data: PartitionBuildData) {
+ let Some(build_accumulator) = &self.build_accumulator else {
+ // Defensive no-op terminal state; current callers avoid scheduling
+ // unless an accumulator is present.
+ self.finalize();
+ return;
+ };
+
+ debug_assert!(matches!(self.state, BuildReportState::NotReported));
+ let acc = Arc::clone(build_accumulator);
+ self.waiter = Some(OnceFut::new(async move {
+ acc.report_build_data(build_data).await
+ }));
+ self.state = BuildReportState::Scheduled;
+ }
+
+ fn poll_delivery(&mut self, cx: &mut std::task::Context<'_>) ->
Poll<Result<()>> {
+ if let Some(ref mut fut) = self.waiter {
+ ready!(fut.get_shared(cx))?;
+ if !matches!(self.state, BuildReportState::Delivered) {
+ debug_assert!(matches!(self.state,
BuildReportState::Scheduled));
+ self.state = BuildReportState::Delivered;
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+
+ fn cancel_pending(&mut self) {
+ if matches!(
+ self.state,
+ BuildReportState::Delivered
+ | BuildReportState::Canceled
+ | BuildReportState::Finalized
+ ) {
+ return;
+ }
+
+ if self.mode == PartitionMode::Partitioned
+ && let Some(build_accumulator) = &self.build_accumulator
+ {
+ build_accumulator.report_canceled_partition(self.partition);
+ self.state = BuildReportState::Canceled;
+ } else {
+ self.finalize();
+ }
+ }
+
+ fn finalize(&mut self) {
+ self.state = BuildReportState::Finalized;
+ }
+
+ #[cfg(test)]
+ fn state(&self) -> &BuildReportState {
+ &self.state
+ }
+}
+
+impl Drop for BuildReportHandle {
+ fn drop(&mut self) {
+ self.cancel_pending();
+ }
}
/// [`Stream`] for [`super::HashJoinExec`] that does the actual join.
@@ -228,13 +322,8 @@ pub(super) struct HashJoinStream {
build_indices_buffer: Vec<u64>,
/// Specifies whether the right side has an ordering to potentially
preserve
right_side_ordered: bool,
- /// Shared build accumulator for coordinating dynamic filter updates
(collects hash maps and/or bounds, optional)
- build_accumulator: Option<Arc<SharedBuildAccumulator>>,
- /// Optional future to signal when build information has been reported by
all partitions
- /// and the dynamic filter has been updated
- build_waiter: Option<OnceFut<()>>,
- /// Tracks where this partition is in the build-data reporting lifecycle.
- build_report_state: BuildReportState,
+ /// Owns this partition's build-data report lifecycle.
+ build_report: BuildReportHandle,
/// Partitioning mode to use
mode: PartitionMode,
/// Output buffer for coalescing small batches into larger ones with
optional fetch limit.
@@ -414,9 +503,7 @@ impl HashJoinStream {
probe_indices_buffer: Vec::with_capacity(batch_size),
build_indices_buffer: Vec::with_capacity(batch_size),
right_side_ordered,
- build_accumulator,
- build_waiter: None,
- build_report_state: BuildReportState::NotReported,
+ build_report: BuildReportHandle::new(partition, mode,
build_accumulator),
mode,
output_buffer,
null_aware,
@@ -449,9 +536,9 @@ impl HashJoinStream {
&mut self,
left_data: &Arc<JoinLeftData>,
) -> HashJoinStreamState {
- let Some(build_accumulator) = self.build_accumulator.as_ref() else {
+ if !self.build_report.has_accumulator() {
return Self::state_after_build_ready(self.join_type,
left_data.as_ref());
- };
+ }
let pushdown = left_data.membership().clone();
let bounds = left_data
@@ -473,11 +560,7 @@ impl HashJoinStream {
),
};
- let acc = Arc::clone(build_accumulator);
- self.build_waiter = Some(OnceFut::new(async move {
- acc.report_build_data(build_data).await
- }));
- self.build_report_state = BuildReportState::ReportScheduled;
+ self.build_report.schedule(build_data);
HashJoinStreamState::WaitPartitionBoundsReport
}
@@ -541,10 +624,7 @@ impl HashJoinStream {
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
- if let Some(ref mut fut) = self.build_waiter {
- ready!(fut.get_shared(cx))?;
- self.build_report_state = BuildReportState::ReportDelivered;
- }
+ ready!(self.build_report.poll_delivery(cx))?;
let build_side = self.build_side.try_as_ready()?;
self.state =
Self::state_after_build_ready(self.join_type,
build_side.left_data.as_ref());
@@ -966,14 +1046,74 @@ impl Stream for HashJoinStream {
}
}
-impl Drop for HashJoinStream {
- fn drop(&mut self) {
- if self.mode == PartitionMode::Partitioned
- && !matches!(self.build_report_state,
BuildReportState::ReportDelivered)
- && let Some(build_accumulator) = &self.build_accumulator
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::joins::hash_join::shared_bounds::{
+ PushdownStrategy, completed_partitions_for_test,
+ make_partitioned_accumulator_for_test,
+ };
+
+ fn empty_build_data(partition_id: usize) -> PartitionBuildData {
+ PartitionBuildData::Partitioned {
+ partition_id,
+ pushdown: PushdownStrategy::Empty,
+ bounds: PartitionBounds::new(vec![]),
+ }
+ }
+
+ fn partitioned_handle(acc: &Arc<SharedBuildAccumulator>) ->
BuildReportHandle {
+ BuildReportHandle::new(0, PartitionMode::Partitioned,
Some(Arc::clone(acc)))
+ }
+
+ #[test]
+ fn build_report_handle_cancels_scheduled_partition_on_drop() {
+ let acc = Arc::new(make_partitioned_accumulator_for_test(2));
+
{
- build_accumulator.report_canceled_partition(self.partition);
- self.build_report_state = BuildReportState::ReportDelivered;
+ let mut handle = partitioned_handle(&acc);
+ handle.schedule(empty_build_data(0));
+ assert_eq!(handle.state(), &BuildReportState::Scheduled);
}
+
+ assert_eq!(completed_partitions_for_test(&acc), 1);
+ }
+
+ #[test]
+ fn build_report_handle_does_not_cancel_delivered_partition_on_drop() {
+ let acc = Arc::new(make_partitioned_accumulator_for_test(1));
+
+ {
+ let mut handle = partitioned_handle(&acc);
+ handle.schedule(empty_build_data(0));
+ let mut cx =
std::task::Context::from_waker(futures::task::noop_waker_ref());
+ assert!(matches!(handle.poll_delivery(&mut cx),
Poll::Ready(Ok(()))));
+ assert_eq!(handle.state(), &BuildReportState::Delivered);
+ }
+
+ assert_eq!(completed_partitions_for_test(&acc), 1);
+ }
+
+ #[test]
+ fn build_report_handle_cancel_pending_is_idempotent() {
+ let acc = Arc::new(make_partitioned_accumulator_for_test(2));
+ let mut handle = partitioned_handle(&acc);
+ handle.schedule(empty_build_data(0));
+
+ handle.cancel_pending();
+ handle.cancel_pending();
+
+ assert_eq!(handle.state(), &BuildReportState::Canceled);
+ assert_eq!(completed_partitions_for_test(&acc), 1);
+ }
+
+ #[test]
+ fn build_report_handle_no_accumulator_finalizes() {
+ let mut handle = BuildReportHandle::new(0, PartitionMode::Partitioned,
None);
+
+ handle.schedule(empty_build_data(0));
+ handle.cancel_pending();
+
+ assert_eq!(handle.state(), &BuildReportState::Finalized);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]