viirya commented on code in PR #22038:
URL: https://github.com/apache/datafusion/pull/22038#discussion_r3469774807
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -908,6 +919,359 @@ pub(crate) struct LeftSpillData {
schema: SchemaRef,
}
+/// Per-chunk shared state in the memory-limited fallback path.
+///
+/// Each chunk's `JoinLeftData` is loaded once by a "leader" partition and
+/// shared (via `Arc`) with every right-side output partition. The
+/// `probe_threads_counter` inside the `JoinLeftData` is initialized to
+/// `right_partition_count`, so `report_probe_completed` returns `true`
+/// only when the *last* partition has finished probing the chunk. That
+/// last partition is then responsible for emitting unmatched left rows
+/// for the chunk, mirroring the single-pass path's coordination via
+/// `collect_left_input(..., probe_threads_count)`.
+struct CurrentChunk {
+ /// 0-based monotonically increasing chunk index.
+ chunk_index: usize,
+ /// Shared per-chunk left data. Cloned by every partition that probes
+ /// this chunk; the last to call `report_probe_completed` emits
+ /// unmatched left rows.
+ data: Arc<JoinLeftData>,
+ /// True if the left stream was exhausted while loading this chunk —
+ /// no further chunks will be produced after it.
+ is_last: bool,
+}
+
+/// Inner state of [`FallbackCoordinator`], guarded by an async mutex.
+struct FallbackCoordinatorInner {
+ /// Reservation owned by the coordinator. Holds the memory for the
+ /// currently-loaded chunk. Reset (`resize(0)`) between chunks.
+ /// Lazily registered by the first leader, after the runtime context
+ /// becomes available via `initiate_fallback`.
+ reservation: Option<MemoryReservation>,
+ /// The shared left spill stream from which chunks are read. Owned by
+ /// the coordinator so only one partition reads it at a time.
+ left_stream: Option<SendableRecordBatchStream>,
+ /// Left schema. Set after the first leader resolves the spill future.
+ left_schema: Option<SchemaRef>,
+ /// One batch carried over from the previous chunk's load: when
+ /// reservation `try_grow` failed for chunk N, the offending batch is
+ /// recorded here and becomes the first batch of chunk N+1.
+ carryover: Option<RecordBatch>,
+ /// True once the left spill stream has produced `None`.
+ left_exhausted: bool,
+ /// Index of the next chunk to be loaded.
+ next_chunk_index: usize,
+ /// The currently-loaded chunk, or `None` if no chunk is currently
+ /// loaded (initial state, or the last partition has just released
+ /// chunk `next_chunk_index - 1` and the next leader hasn't taken
+ /// over yet).
+ current: Option<CurrentChunk>,
+ /// True while a partition has claimed leader role for the next
+ /// chunk and is loading it; prevents two partitions from racing.
+ loader_in_flight: bool,
+}
+
+/// Plan-level shared coordinator for the memory-limited fallback path.
+///
+/// All right-side output partitions share one of these. It serializes
+/// access to the left spill stream (so each chunk is read exactly once),
+/// publishes the loaded chunk as an `Arc<JoinLeftData>` for every
+/// partition to clone, and uses a `Notify` so partitions waiting for the
+/// next chunk can sleep without busy-looping.
+pub(crate) struct FallbackCoordinator {
+ /// Number of right-side partitions; equals the
+ /// `probe_threads_counter` initial value for each chunk.
+ right_partition_count: usize,
+ /// Whether `JoinLeftData` should carry a left visited bitmap (for
+ /// join types that emit unmatched left rows in the final output).
+ with_visited_bitmap: bool,
+ inner: tokio::sync::Mutex<FallbackCoordinatorInner>,
+ /// Notified when a new chunk becomes available, when the left stream
+ /// is exhausted, or when a chunk is released.
+ notify: tokio::sync::Notify,
+}
+
+impl FallbackCoordinator {
+ fn new(right_partition_count: usize, with_visited_bitmap: bool) -> Self {
+ Self {
+ right_partition_count,
+ with_visited_bitmap,
+ inner: tokio::sync::Mutex::new(FallbackCoordinatorInner {
+ reservation: None,
+ left_stream: None,
+ left_schema: None,
+ carryover: None,
+ left_exhausted: false,
+ next_chunk_index: 0,
+ current: None,
+ loader_in_flight: false,
+ }),
+ notify: tokio::sync::Notify::new(),
+ }
+ }
+
+ /// After the last partition finishes processing chunk
+ /// `released_chunk_index`, drop the slot so the next leader can
+ /// load chunk `released_chunk_index + 1`.
+ async fn release_chunk(self: &Arc<Self>, released_chunk_index: usize) {
+ let mut inner = self.inner.lock().await;
+ if let Some(cur) = &inner.current
+ && cur.chunk_index == released_chunk_index
+ {
+ inner.current = None;
+ inner.next_chunk_index = released_chunk_index + 1;
+ }
+ // Always notify: waiters may be blocked because they couldn't
+ // become leader while a previous chunk was current.
+ drop(inner);
+ self.notify.notify_waiters();
+ }
+
+ /// Fetch `expected_chunk_index`, becoming leader to load it from the
+ /// left spill stream if no other partition has done so. Returns
+ /// `Ok(None)` when the left stream is exhausted and no chunk with
+ /// the requested index exists.
+ async fn next_chunk(
+ self: Arc<Self>,
+ expected_chunk_index: usize,
+ left_spill_fut: OnceFut<LeftSpillData>,
+ task_context: Arc<TaskContext>,
+ ) -> Result<Option<(Arc<JoinLeftData>, bool)>> {
+ // Resolve the left spill future once. All partitions share the
+ // same OnceFut so this only does real work the first time.
+ let spill_data = left_spill_fut_get_shared(left_spill_fut).await?;
+
+ loop {
+ let mut inner = self.inner.lock().await;
+
+ // Case 1: requested chunk is already loaded.
+ if let Some(cur) = &inner.current
+ && cur.chunk_index == expected_chunk_index
+ {
+ return Ok(Some((Arc::clone(&cur.data), cur.is_last)));
+ }
+
+ // Case 2: left stream exhausted and no current chunk to
+ // deliver — caller is past the last chunk.
+ if inner.left_exhausted
+ && inner.current.is_none()
+ && inner.carryover.is_none()
+ {
+ return Ok(None);
+ }
+
+ // Case 3: no chunk loaded and no leader yet — claim leader.
+ if inner.current.is_none() && !inner.loader_in_flight {
+ inner.loader_in_flight = true;
+ // Lazily initialize the shared left stream, schema, and
+ // chunk reservation. Only the leader does this (under the
+ // `loader_in_flight` guard), so waiters never open a
+ // throwaway spill stream that the leader would overwrite.
+ let mut left_stream = match inner.left_stream.take() {
+ Some(stream) => stream,
+ None => {
+ let stream = spill_data
+ .spill_manager
+
.read_spill_as_stream(spill_data.spill_file.clone(), None)?;
Review Comment:
Good eye — agreed it's worth hardening even though it isn't reachable today
(`read_spill_as_stream` only builds the buffered stream; the real I/O errors
surface later while polling, where `load_one_chunk` already clears
`loader_in_flight` and notifies).
Fixed: on stream-construction failure the leader now clears
`loader_in_flight`, drops the lock, wakes waiters, and propagates the error, so
another partition can claim the leader role and retry rather than blocking on a
release that never comes. Pushed as a separate commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]