UBarney commented on code in PR #16889: URL: https://github.com/apache/datafusion/pull/16889#discussion_r2232819590
########## datafusion/physical-plan/src/joins/nlj.rs: ########## @@ -0,0 +1,803 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implementation of the Nested Loop Join operator. +//! +//! For detailed information regarding the operator's state machine and execution flow, +//! please refer to the documentation provided in the `poll_next()` method. + +use arrow::buffer::MutableBuffer; +use arrow::compute::BatchCoalescer; +use futures::{ready, StreamExt}; +use log::debug; +use std::sync::Arc; +use std::task::Poll; + +use crate::joins::nested_loop_join::JoinLeftData; +use crate::joins::utils::{ + apply_join_filter_to_indices, build_batch_from_indices_maybe_empty, + need_produce_result_in_final, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, + OnceFut, +}; +use crate::metrics::Count; +use crate::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::array::{ + BooleanArray, BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, + UInt64Builder, +}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{ + internal_datafusion_err, unwrap_or_internal_err, DataFusionError, JoinSide, Result, +}; +use datafusion_expr::JoinType; + +use futures::Stream; + +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, +} + +pub(crate) struct NLJStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // ======================================================================== + /// Output schema + pub(crate) output_schema: Arc<Schema>, + /// join filter + pub(crate) join_filter: Option<JoinFilter>, + /// type of the join + pub(crate) join_type: JoinType, + /// the outer table data of the nested loop join + pub(crate) outer_table: SendableRecordBatchStream, + /// the inner table data of the nested loop join + pub(crate) inner_table: OnceFut<JoinLeftData>, + /// Information of index and left / right placement of columns + pub(crate) column_indices: Vec<ColumnIndex>, + /// Join execution metrics + pub(crate) join_metrics: BuildProbeJoinMetrics, + + /// `batch_size` from configuration + cfg_batch_size: usize, + + /// Should we use a bitmap to track each incoming right batch's each row's + /// 'joined' status. + /// For example in right joins, we have to use a bit map to track matched + /// right side rows, and later enter a `EmitRightUnmatched` stage to emit + /// unmatched right rows. + should_track_unmatched_right: bool, + + // ======================================================================== + // STATE FLAGS/BUFFERS: + // Fields that hold intermediate data/flags during execution + // ======================================================================== + /// State Tracking + state: NLJState, + /// Output buffer holds the join result to output. It will emit eagerly when + /// the threshold is reached. + output_buffer: Box<BatchCoalescer>, + /// See comments in `NLJState::Done` for its purpose + handled_empty_output: bool, + + // Buffer(left) side + // ----------------- + /// The current buffered left data to join + buffered_left_data: Option<Arc<JoinLeftData>>, + /// Index into the left buffered batch. Used in `ProbeRight` state + l_index: usize, + /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state + emit_cursor: u64, + /// Should we go back to `BufferingLeft` state again after `EmitLeftUnmatched` + /// state is over. + left_exhausted: bool, + /// If we can buffer all left data in one pass + /// TODO(now): this is for the (unimplemented) memory-limited execution + #[allow(dead_code)] + left_buffered_in_one_pass: bool, + + // Probe(right) side + // ----------------- + /// The current probe batch to process + current_right_batch: Option<RecordBatch>, + // For right join, keep track of matched rows in `current_right_batch` + // Constructed when fetching each new incoming right batch in `FetchingRight` state. + current_right_batch_matched: Option<BooleanBufferBuilder>, +} + +impl Stream for NLJStream { + type Item = Result<RecordBatch>; + + /// # Design + /// + /// The high-level control flow for this operator is: + /// 1. Buffer all batches from the left side (unless memory limit is reached, + /// in which case see notes at 'Memory-limited Execution'). + /// - Rationale: The right side scanning can be expensive (it might + /// include decoding Parquet files), so it tries to buffer more left + /// batches at once to minimize the scan passes. + /// 2. Read right side batch one at a time. For each iteration, it only + /// evaluates the join filter on (1-left-row x right-batch), and puts the Review Comment: With this join method, it seems we can't preserve the order of the right table. https://github.com/apache/datafusion/issues/16364#issuecomment-2975520489 ########## datafusion/physical-plan/src/joins/nlj.rs: ########## @@ -0,0 +1,803 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implementation of the Nested Loop Join operator. +//! +//! For detailed information regarding the operator's state machine and execution flow, +//! please refer to the documentation provided in the `poll_next()` method. + +use arrow::buffer::MutableBuffer; +use arrow::compute::BatchCoalescer; +use futures::{ready, StreamExt}; +use log::debug; +use std::sync::Arc; +use std::task::Poll; + +use crate::joins::nested_loop_join::JoinLeftData; +use crate::joins::utils::{ + apply_join_filter_to_indices, build_batch_from_indices_maybe_empty, + need_produce_result_in_final, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, + OnceFut, +}; +use crate::metrics::Count; +use crate::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::array::{ + BooleanArray, BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, + UInt64Builder, +}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{ + internal_datafusion_err, unwrap_or_internal_err, DataFusionError, JoinSide, Result, +}; +use datafusion_expr::JoinType; + +use futures::Stream; + +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, +} + +pub(crate) struct NLJStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // ======================================================================== + /// Output schema + pub(crate) output_schema: Arc<Schema>, + /// join filter + pub(crate) join_filter: Option<JoinFilter>, + /// type of the join + pub(crate) join_type: JoinType, + /// the outer table data of the nested loop join + pub(crate) outer_table: SendableRecordBatchStream, + /// the inner table data of the nested loop join + pub(crate) inner_table: OnceFut<JoinLeftData>, + /// Information of index and left / right placement of columns + pub(crate) column_indices: Vec<ColumnIndex>, + /// Join execution metrics + pub(crate) join_metrics: BuildProbeJoinMetrics, + + /// `batch_size` from configuration + cfg_batch_size: usize, + + /// Should we use a bitmap to track each incoming right batch's each row's + /// 'joined' status. + /// For example in right joins, we have to use a bit map to track matched + /// right side rows, and later enter a `EmitRightUnmatched` stage to emit + /// unmatched right rows. + should_track_unmatched_right: bool, + + // ======================================================================== + // STATE FLAGS/BUFFERS: + // Fields that hold intermediate data/flags during execution + // ======================================================================== + /// State Tracking + state: NLJState, + /// Output buffer holds the join result to output. It will emit eagerly when + /// the threshold is reached. + output_buffer: Box<BatchCoalescer>, + /// See comments in `NLJState::Done` for its purpose + handled_empty_output: bool, + + // Buffer(left) side + // ----------------- + /// The current buffered left data to join + buffered_left_data: Option<Arc<JoinLeftData>>, + /// Index into the left buffered batch. Used in `ProbeRight` state + l_index: usize, + /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state + emit_cursor: u64, + /// Should we go back to `BufferingLeft` state again after `EmitLeftUnmatched` + /// state is over. + left_exhausted: bool, + /// If we can buffer all left data in one pass + /// TODO(now): this is for the (unimplemented) memory-limited execution + #[allow(dead_code)] + left_buffered_in_one_pass: bool, + + // Probe(right) side + // ----------------- + /// The current probe batch to process + current_right_batch: Option<RecordBatch>, + // For right join, keep track of matched rows in `current_right_batch` + // Constructed when fetching each new incoming right batch in `FetchingRight` state. + current_right_batch_matched: Option<BooleanBufferBuilder>, +} + +impl Stream for NLJStream { + type Item = Result<RecordBatch>; + + /// # Design + /// + /// The high-level control flow for this operator is: + /// 1. Buffer all batches from the left side (unless memory limit is reached, + /// in which case see notes at 'Memory-limited Execution'). + /// - Rationale: The right side scanning can be expensive (it might + /// include decoding Parquet files), so it tries to buffer more left + /// batches at once to minimize the scan passes. + /// 2. Read right side batch one at a time. For each iteration, it only + /// evaluates the join filter on (1-left-row x right-batch), and puts the + /// result into the output buffer. Once the output buffer has reached + /// the threshold, output immediately. + /// - Rationale: Making the intermediate data smaller can 1) be more cache + /// friendly for processing to execute faster, and 2) use less memory. + /// + /// Note: Currently, both the filter-evaluation granularity and output + /// buffer size are `batch_size` from the configuration (default 8192). + /// We might try to tune it slightly for performance in the future. + /// + /// + /// + /// # Memory-limited Execution + /// + /// TODO. + /// The idea is each time buffer as much batches from the left side as + /// possible, then scan the right side once for all buffered left data. + /// Then buffer another left batches, scan right side again until finish. + /// + /// + /// + /// # Implementation + /// + /// This function is the entry point of NLJ operator's state machine + /// transitions. The rough state transition graph is as follow, for more + /// details see the comment in each state's matching arm. + /// + /// Draft state transition graph: + /// + /// (start) --> BufferingLeft + /// ---------------------------- + /// BufferingLeft → FetchingRight + /// + /// FetchingRight → ProbeRight (if right batch available) + /// FetchingRight → EmitLeftUnmatched (if right exhausted) + /// + /// ProbeRight → ProbeRight (next left row or after yielding output) + /// ProbeRight → EmitRightUnmatched (for special join types like right join) + /// ProbeRight → FetchingRight (done with the current right batch) + /// + /// EmitRightUnmatched → FetchingRight + /// + /// EmitLeftUnmatched → EmitLeftUnmatched (only process 1 chunk for each + /// iteration) + /// EmitLeftUnmatched → Done (if finished) + /// ---------------------------- + /// Done → (end) + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Self::Item>> { + loop { + match self.state { + // # NLJState transitions + // --> FetchingRight + // This state will prepare the left side batches, next state + // `FetchingRight` is responsible for preparing a single probe + // side batch, before start joining. + NLJState::BufferingLeft => { + debug!("[NLJState] Entering: {:?}", self.state); + match ready!(self.inner_table.get_shared(cx)) { + Ok(left_data) => { + self.buffered_left_data = Some(left_data); + // TOOD: implement memory-limited case + self.left_exhausted = true; + self.state = NLJState::FetchingRight; + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // # NLJState transitions: + // 1. --> ProbeRight + // Start processing the join for the newly fetched right + // batch. + // 2. --> EmitLeftUnmatched: When the right side input is exhausted, (maybe) emit + // unmatched left side rows. + // + // After fetching a new batch from the right side, it will + // process all rows from the buffered left data: + // ```text + // for batch in right_side: + // for row in left_buffer: + // join(batch, row) + // ``` + // Note: the implementation does this step incrementally, + // instead of materializing all intermediate Cartesian products + // at once in memory. + // + // So after the right side input is exhausted, the join phase + // for the current buffered left data is finished. We can go to + // the next `EmitLeftUnmatched` phase to check if there is any + // special handling (e.g., in cases like left join). + NLJState::FetchingRight => { + debug!("[NLJState] Entering: {:?}", self.state); + match ready!(self.outer_table.poll_next_unpin(cx)) { + Some(Ok(right_batch)) => { + let right_batch_size = right_batch.num_rows(); + + // Skip the empty batch + if right_batch_size == 0 { + continue; + } + + self.current_right_batch = Some(right_batch); + + // Prepare right bitmap + if self.should_track_unmatched_right { + let new_size = right_batch_size; + let zeroed_buf = MutableBuffer::from_len_zeroed(new_size); + self.current_right_batch_matched = + Some(BooleanBufferBuilder::new_from_buffer( + zeroed_buf, new_size, + )); + } + + self.l_index = 0; + self.state = NLJState::ProbeRight; + continue; + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + // Right stream exhausted/as + self.state = NLJState::EmitLeftUnmatched; + continue; + } + } + } + + // NLJState transitions: + // 1. --> ProbeRight(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> ProbeRight(2) + // After probing one right batch, and evaluating the + // join filter on (left-row x right-batch), it will advance + // to the next left row, then re-enter the current state and + // continue joining. + // 3. --> FetchRight + // After it has done with the current right batch (to join + // with all rows in the left buffer), it will go to + // FetchRight state to check what to do next. + NLJState::ProbeRight => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any completed batches first + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // Process current probe state + match self.process_probe_batch() { + // State unchanged (ProbeRight) + // Continue probing until we have done joining the + // current right batch with all buffered left rows. + Ok(true) => continue, + // To next FetchRightState + // We have finished joining + // (cur_right_batch x buffered_left_batches) + Ok(false) => { + // Left exhausted, transition to FetchingRight + // TODO(polish): use a flag for clarity + self.l_index = 0; + if self.current_right_batch_matched.is_some() { + // Don't reset current_right_batch, it'll be + // cleared inside `EmitRightUnmatched` state + self.state = NLJState::EmitRightUnmatched; + } else { + self.current_right_batch = None; + self.state = NLJState::FetchingRight; + } + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // In the `current_right_batch_matched` bitmap, all trues mean + // it has been outputed by the join. In this state we have to + // output unmatched rows for current right batch (with null + // padding for left relation) + // Precondition: we have checked the join type so that it's + // possible to output right unmatched (e.g. it's right join) + NLJState::EmitRightUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + debug_assert!(self.current_right_batch.is_some()); + debug_assert!(self.current_right_batch_matched.is_some()); + + // Construct the result batch for unmatched right rows using a utility function + let result_batch = self.process_right_unmatched()?; + self.output_buffer.push_batch(result_batch)?; + + // Processed all in one pass + // cleared inside `process_right_unmatched` + debug_assert!(self.current_right_batch.is_none()); + self.state = NLJState::FetchingRight; + } + + // NLJState transitions: + // 1. --> EmitLeftUnmatched(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> EmitLeftUnmatched(2) + // After processing some unmatched rows, it will re-enter + // the same state, to check if there are any more final + // results to output. + // 3. --> Done + // It has processed all data, go to the final state and ready + // to exit. + // + // TODO: For memory-limited case, go back to `BufferingLeft` + // state again. + NLJState::EmitLeftUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any completed batches first + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // Process current unmatched state + match self.process_left_unmatched() { + // State unchanged (EmitLeftUnmatched) + // Continue processing until we have processed all unmatched rows + Ok(true) => continue, + // To Done state + // We have finished processing all unmatched rows + Ok(false) => { + self.output_buffer.finish_buffered_batch()?; + self.state = NLJState::Done; + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // The final state and the exit point + NLJState::Done => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any remaining completed batches before final termination + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // HACK for the doc test in https://github.com/apache/datafusion/blob/main/datafusion/core/src/dataframe/mod.rs#L1265 + // If this operator directly return `Poll::Ready(None)` + // for empty result, the final result will become an empty + // batch with empty schema, however the expected result + // should be with the expected schema for this operator + if !self.handled_empty_output { + let zero_count = Count::new(); + if *self.join_metrics.baseline.output_rows() == zero_count { + let empty_batch = + RecordBatch::new_empty(Arc::clone(&self.output_schema)); + self.handled_empty_output = true; + return Poll::Ready(Some(Ok(empty_batch))); + } + } + + return Poll::Ready(None); + } + } + } + } +} + +impl RecordBatchStream for NLJStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.output_schema) + } +} + +impl NLJStream { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + schema: Arc<Schema>, + filter: Option<JoinFilter>, + join_type: JoinType, + outer_table: SendableRecordBatchStream, + inner_table: OnceFut<JoinLeftData>, + column_indices: Vec<ColumnIndex>, + join_metrics: BuildProbeJoinMetrics, + cfg_batch_size: usize, + ) -> Self { + let should_track_unmatched_right = matches!( + join_type, + JoinType::Full + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ); + + Self { + output_schema: Arc::clone(&schema), + join_filter: filter, + join_type, + outer_table, + column_indices, + inner_table, + join_metrics, + buffered_left_data: None, + output_buffer: Box::new(BatchCoalescer::new(schema, cfg_batch_size)), + cfg_batch_size, + current_right_batch: None, + current_right_batch_matched: None, + state: NLJState::BufferingLeft, + l_index: 0, + emit_cursor: 0, + left_exhausted: false, + left_buffered_in_one_pass: true, + handled_empty_output: false, + should_track_unmatched_right, + } + } + + // ==== Core logic handling for each state ==== + + /// Returns bool to indicate should it continue probing + /// true -> continue in the same ProbeRight state + /// false -> It has done with the (buffered_left x cur_right_batch), go to + /// next state (ProbeRight) + fn process_probe_batch(&mut self) -> Result<bool> { + let left_data = + Arc::clone(self.buffered_left_data.as_ref().ok_or_else(|| { + internal_datafusion_err!("LeftData should be available") + })?); + let right_batch = self + .current_right_batch + .as_ref() + .ok_or_else(|| internal_datafusion_err!("Right batch should be available"))? + .clone(); + + // stop probing, the caller will go to the next state + if self.l_index >= left_data.batch().num_rows() { + return Ok(false); + } + + // ======== + // Join (l_row x right_batch) + // and push the result into output_buffer + // ======== + + let l_idx = self.l_index; + let join_batch = + self.process_single_left_row_join(&left_data, &right_batch, l_idx)?; + + self.output_buffer.push_batch(join_batch)?; + + // ==== Prepare for the next iteration ==== + + // Advance left cursor + self.l_index += 1; + + // Return true to continue probing + Ok(true) + } + + /// Process a single left row join with the current right batch. + /// Returns a RecordBatch containing the join results (may be empty). + fn process_single_left_row_join( + &mut self, + left_data: &JoinLeftData, + right_batch: &RecordBatch, + l_index: usize, + ) -> Result<RecordBatch> { + let right_row_count = right_batch.num_rows(); + if right_row_count == 0 { + return Ok(RecordBatch::new_empty(Arc::clone(&self.output_schema))); + } + + // Create indices for cross-join: current left row with all right rows + let left_indices = UInt64Array::from(vec![l_index as u64; right_row_count]); + let right_indices = UInt32Array::from_iter_values(0..right_row_count as u32); + + // Apply join filter if present + let (joined_left_indices, joined_right_indices) = + if let Some(ref filter) = self.join_filter { + apply_join_filter_to_indices( + left_data.batch(), + right_batch, + left_indices, + right_indices, + filter, + JoinSide::Left, + None, + )? + } else { + (left_indices, right_indices) + }; + + // Update left row match bitmap for outer join support + if need_produce_result_in_final(self.join_type) && !joined_left_indices.is_empty() + { + let mut bitmap = left_data.bitmap().lock(); + bitmap.set_bit(l_index, true); + } + + // TODO(now-perf): better vectorize it + if let Some(bitmap) = self.current_right_batch_matched.as_mut() { + for i in joined_right_indices.iter() { + // After the initial join, indices must all be Some + bitmap.set_bit(i.unwrap() as usize, true); + // println!("Setting bit {i:?} to true"); + } + } + + // For the following join types: here we only have to set the left/right + // bitmap, and no need to output result + if matches!( + self.join_type, + JoinType::LeftAnti + | JoinType::LeftSemi + | JoinType::LeftMark + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ) { + return Ok(RecordBatch::new_empty(Arc::clone(&self.output_schema))); Review Comment: maybe we can change return type of this function to `Result<Option<RecordBatch>>` ########## datafusion/physical-plan/src/joins/nlj.rs: ########## @@ -0,0 +1,803 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implementation of the Nested Loop Join operator. +//! +//! For detailed information regarding the operator's state machine and execution flow, +//! please refer to the documentation provided in the `poll_next()` method. + +use arrow::buffer::MutableBuffer; +use arrow::compute::BatchCoalescer; +use futures::{ready, StreamExt}; +use log::debug; +use std::sync::Arc; +use std::task::Poll; + +use crate::joins::nested_loop_join::JoinLeftData; +use crate::joins::utils::{ + apply_join_filter_to_indices, build_batch_from_indices_maybe_empty, + need_produce_result_in_final, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, + OnceFut, +}; +use crate::metrics::Count; +use crate::{RecordBatchStream, SendableRecordBatchStream}; + +use arrow::array::{ + BooleanArray, BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, + UInt64Builder, +}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{ + internal_datafusion_err, unwrap_or_internal_err, DataFusionError, JoinSide, Result, +}; +use datafusion_expr::JoinType; + +use futures::Stream; + +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, +} + +pub(crate) struct NLJStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // ======================================================================== + /// Output schema + pub(crate) output_schema: Arc<Schema>, + /// join filter + pub(crate) join_filter: Option<JoinFilter>, + /// type of the join + pub(crate) join_type: JoinType, + /// the outer table data of the nested loop join + pub(crate) outer_table: SendableRecordBatchStream, + /// the inner table data of the nested loop join + pub(crate) inner_table: OnceFut<JoinLeftData>, + /// Information of index and left / right placement of columns + pub(crate) column_indices: Vec<ColumnIndex>, + /// Join execution metrics + pub(crate) join_metrics: BuildProbeJoinMetrics, + + /// `batch_size` from configuration + cfg_batch_size: usize, + + /// Should we use a bitmap to track each incoming right batch's each row's + /// 'joined' status. + /// For example in right joins, we have to use a bit map to track matched + /// right side rows, and later enter a `EmitRightUnmatched` stage to emit + /// unmatched right rows. + should_track_unmatched_right: bool, + + // ======================================================================== + // STATE FLAGS/BUFFERS: + // Fields that hold intermediate data/flags during execution + // ======================================================================== + /// State Tracking + state: NLJState, + /// Output buffer holds the join result to output. It will emit eagerly when + /// the threshold is reached. + output_buffer: Box<BatchCoalescer>, + /// See comments in `NLJState::Done` for its purpose + handled_empty_output: bool, + + // Buffer(left) side + // ----------------- + /// The current buffered left data to join + buffered_left_data: Option<Arc<JoinLeftData>>, + /// Index into the left buffered batch. Used in `ProbeRight` state + l_index: usize, + /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state + emit_cursor: u64, + /// Should we go back to `BufferingLeft` state again after `EmitLeftUnmatched` + /// state is over. + left_exhausted: bool, + /// If we can buffer all left data in one pass + /// TODO(now): this is for the (unimplemented) memory-limited execution + #[allow(dead_code)] + left_buffered_in_one_pass: bool, + + // Probe(right) side + // ----------------- + /// The current probe batch to process + current_right_batch: Option<RecordBatch>, + // For right join, keep track of matched rows in `current_right_batch` + // Constructed when fetching each new incoming right batch in `FetchingRight` state. + current_right_batch_matched: Option<BooleanBufferBuilder>, +} + +impl Stream for NLJStream { + type Item = Result<RecordBatch>; + + /// # Design + /// + /// The high-level control flow for this operator is: + /// 1. Buffer all batches from the left side (unless memory limit is reached, + /// in which case see notes at 'Memory-limited Execution'). + /// - Rationale: The right side scanning can be expensive (it might + /// include decoding Parquet files), so it tries to buffer more left + /// batches at once to minimize the scan passes. + /// 2. Read right side batch one at a time. For each iteration, it only + /// evaluates the join filter on (1-left-row x right-batch), and puts the + /// result into the output buffer. Once the output buffer has reached + /// the threshold, output immediately. + /// - Rationale: Making the intermediate data smaller can 1) be more cache + /// friendly for processing to execute faster, and 2) use less memory. + /// + /// Note: Currently, both the filter-evaluation granularity and output + /// buffer size are `batch_size` from the configuration (default 8192). + /// We might try to tune it slightly for performance in the future. + /// + /// + /// + /// # Memory-limited Execution + /// + /// TODO. + /// The idea is each time buffer as much batches from the left side as + /// possible, then scan the right side once for all buffered left data. + /// Then buffer another left batches, scan right side again until finish. + /// + /// + /// + /// # Implementation + /// + /// This function is the entry point of NLJ operator's state machine + /// transitions. The rough state transition graph is as follow, for more + /// details see the comment in each state's matching arm. + /// + /// Draft state transition graph: + /// + /// (start) --> BufferingLeft + /// ---------------------------- + /// BufferingLeft → FetchingRight + /// + /// FetchingRight → ProbeRight (if right batch available) + /// FetchingRight → EmitLeftUnmatched (if right exhausted) + /// + /// ProbeRight → ProbeRight (next left row or after yielding output) + /// ProbeRight → EmitRightUnmatched (for special join types like right join) + /// ProbeRight → FetchingRight (done with the current right batch) + /// + /// EmitRightUnmatched → FetchingRight + /// + /// EmitLeftUnmatched → EmitLeftUnmatched (only process 1 chunk for each + /// iteration) + /// EmitLeftUnmatched → Done (if finished) + /// ---------------------------- + /// Done → (end) + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Self::Item>> { + loop { + match self.state { + // # NLJState transitions + // --> FetchingRight + // This state will prepare the left side batches, next state + // `FetchingRight` is responsible for preparing a single probe + // side batch, before start joining. + NLJState::BufferingLeft => { + debug!("[NLJState] Entering: {:?}", self.state); + match ready!(self.inner_table.get_shared(cx)) { + Ok(left_data) => { + self.buffered_left_data = Some(left_data); + // TOOD: implement memory-limited case + self.left_exhausted = true; + self.state = NLJState::FetchingRight; + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // # NLJState transitions: + // 1. --> ProbeRight + // Start processing the join for the newly fetched right + // batch. + // 2. --> EmitLeftUnmatched: When the right side input is exhausted, (maybe) emit + // unmatched left side rows. + // + // After fetching a new batch from the right side, it will + // process all rows from the buffered left data: + // ```text + // for batch in right_side: + // for row in left_buffer: + // join(batch, row) + // ``` + // Note: the implementation does this step incrementally, + // instead of materializing all intermediate Cartesian products + // at once in memory. + // + // So after the right side input is exhausted, the join phase + // for the current buffered left data is finished. We can go to + // the next `EmitLeftUnmatched` phase to check if there is any + // special handling (e.g., in cases like left join). + NLJState::FetchingRight => { + debug!("[NLJState] Entering: {:?}", self.state); + match ready!(self.outer_table.poll_next_unpin(cx)) { + Some(Ok(right_batch)) => { + let right_batch_size = right_batch.num_rows(); + + // Skip the empty batch + if right_batch_size == 0 { + continue; + } + + self.current_right_batch = Some(right_batch); + + // Prepare right bitmap + if self.should_track_unmatched_right { + let new_size = right_batch_size; + let zeroed_buf = MutableBuffer::from_len_zeroed(new_size); + self.current_right_batch_matched = + Some(BooleanBufferBuilder::new_from_buffer( + zeroed_buf, new_size, + )); + } + + self.l_index = 0; + self.state = NLJState::ProbeRight; + continue; + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + // Right stream exhausted/as + self.state = NLJState::EmitLeftUnmatched; + continue; + } + } + } + + // NLJState transitions: + // 1. --> ProbeRight(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> ProbeRight(2) + // After probing one right batch, and evaluating the + // join filter on (left-row x right-batch), it will advance + // to the next left row, then re-enter the current state and + // continue joining. + // 3. --> FetchRight + // After it has done with the current right batch (to join + // with all rows in the left buffer), it will go to + // FetchRight state to check what to do next. + NLJState::ProbeRight => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any completed batches first + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // Process current probe state + match self.process_probe_batch() { + // State unchanged (ProbeRight) + // Continue probing until we have done joining the + // current right batch with all buffered left rows. + Ok(true) => continue, + // To next FetchRightState + // We have finished joining + // (cur_right_batch x buffered_left_batches) + Ok(false) => { + // Left exhausted, transition to FetchingRight + // TODO(polish): use a flag for clarity + self.l_index = 0; + if self.current_right_batch_matched.is_some() { + // Don't reset current_right_batch, it'll be + // cleared inside `EmitRightUnmatched` state + self.state = NLJState::EmitRightUnmatched; + } else { + self.current_right_batch = None; + self.state = NLJState::FetchingRight; + } + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // In the `current_right_batch_matched` bitmap, all trues mean + // it has been outputed by the join. In this state we have to + // output unmatched rows for current right batch (with null + // padding for left relation) + // Precondition: we have checked the join type so that it's + // possible to output right unmatched (e.g. it's right join) + NLJState::EmitRightUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + debug_assert!(self.current_right_batch.is_some()); + debug_assert!(self.current_right_batch_matched.is_some()); + + // Construct the result batch for unmatched right rows using a utility function + let result_batch = self.process_right_unmatched()?; + self.output_buffer.push_batch(result_batch)?; + + // Processed all in one pass + // cleared inside `process_right_unmatched` + debug_assert!(self.current_right_batch.is_none()); + self.state = NLJState::FetchingRight; + } + + // NLJState transitions: + // 1. --> EmitLeftUnmatched(1) + // If we have already buffered enough output to yield, it + // will first give back control to the parent state machine, + // then resume at the same place. + // 2. --> EmitLeftUnmatched(2) + // After processing some unmatched rows, it will re-enter + // the same state, to check if there are any more final + // results to output. + // 3. --> Done + // It has processed all data, go to the final state and ready + // to exit. + // + // TODO: For memory-limited case, go back to `BufferingLeft` + // state again. + NLJState::EmitLeftUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any completed batches first + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // Process current unmatched state + match self.process_left_unmatched() { + // State unchanged (EmitLeftUnmatched) + // Continue processing until we have processed all unmatched rows + Ok(true) => continue, + // To Done state + // We have finished processing all unmatched rows + Ok(false) => { + self.output_buffer.finish_buffered_batch()?; + self.state = NLJState::Done; + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + // The final state and the exit point + NLJState::Done => { + debug!("[NLJState] Entering: {:?}", self.state); + // Return any remaining completed batches before final termination + if self.output_buffer.has_completed_batch() { + if let Some(batch) = self.output_buffer.next_completed_batch() { + let poll = Poll::Ready(Some(Ok(batch))); + return self.join_metrics.baseline.record_poll(poll); + } + } + + // HACK for the doc test in https://github.com/apache/datafusion/blob/main/datafusion/core/src/dataframe/mod.rs#L1265 + // If this operator directly return `Poll::Ready(None)` + // for empty result, the final result will become an empty + // batch with empty schema, however the expected result + // should be with the expected schema for this operator + if !self.handled_empty_output { + let zero_count = Count::new(); + if *self.join_metrics.baseline.output_rows() == zero_count { + let empty_batch = + RecordBatch::new_empty(Arc::clone(&self.output_schema)); + self.handled_empty_output = true; + return Poll::Ready(Some(Ok(empty_batch))); + } + } + + return Poll::Ready(None); + } + } + } + } +} + +impl RecordBatchStream for NLJStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.output_schema) + } +} + +impl NLJStream { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + schema: Arc<Schema>, + filter: Option<JoinFilter>, + join_type: JoinType, + outer_table: SendableRecordBatchStream, + inner_table: OnceFut<JoinLeftData>, + column_indices: Vec<ColumnIndex>, + join_metrics: BuildProbeJoinMetrics, + cfg_batch_size: usize, + ) -> Self { + let should_track_unmatched_right = matches!( + join_type, + JoinType::Full + | JoinType::Right + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ); + + Self { + output_schema: Arc::clone(&schema), + join_filter: filter, + join_type, + outer_table, + column_indices, + inner_table, + join_metrics, + buffered_left_data: None, + output_buffer: Box::new(BatchCoalescer::new(schema, cfg_batch_size)), + cfg_batch_size, + current_right_batch: None, + current_right_batch_matched: None, + state: NLJState::BufferingLeft, + l_index: 0, + emit_cursor: 0, + left_exhausted: false, + left_buffered_in_one_pass: true, + handled_empty_output: false, + should_track_unmatched_right, + } + } + + // ==== Core logic handling for each state ==== + + /// Returns bool to indicate should it continue probing + /// true -> continue in the same ProbeRight state + /// false -> It has done with the (buffered_left x cur_right_batch), go to + /// next state (ProbeRight) + fn process_probe_batch(&mut self) -> Result<bool> { + let left_data = + Arc::clone(self.buffered_left_data.as_ref().ok_or_else(|| { + internal_datafusion_err!("LeftData should be available") + })?); + let right_batch = self + .current_right_batch + .as_ref() + .ok_or_else(|| internal_datafusion_err!("Right batch should be available"))? + .clone(); + + // stop probing, the caller will go to the next state + if self.l_index >= left_data.batch().num_rows() { + return Ok(false); + } + + // ======== + // Join (l_row x right_batch) + // and push the result into output_buffer + // ======== + + let l_idx = self.l_index; + let join_batch = + self.process_single_left_row_join(&left_data, &right_batch, l_idx)?; + + self.output_buffer.push_batch(join_batch)?; + + // ==== Prepare for the next iteration ==== + + // Advance left cursor + self.l_index += 1; + + // Return true to continue probing + Ok(true) + } + + /// Process a single left row join with the current right batch. + /// Returns a RecordBatch containing the join results (may be empty). + fn process_single_left_row_join( + &mut self, + left_data: &JoinLeftData, + right_batch: &RecordBatch, + l_index: usize, + ) -> Result<RecordBatch> { + let right_row_count = right_batch.num_rows(); + if right_row_count == 0 { + return Ok(RecordBatch::new_empty(Arc::clone(&self.output_schema))); + } + + // Create indices for cross-join: current left row with all right rows + let left_indices = UInt64Array::from(vec![l_index as u64; right_row_count]); + let right_indices = UInt32Array::from_iter_values(0..right_row_count as u32); + + // Apply join filter if present + let (joined_left_indices, joined_right_indices) = + if let Some(ref filter) = self.join_filter { + apply_join_filter_to_indices( + left_data.batch(), + right_batch, + left_indices, + right_indices, + filter, + JoinSide::Left, + None, + )? + } else { + (left_indices, right_indices) + }; + + // Update left row match bitmap for outer join support + if need_produce_result_in_final(self.join_type) && !joined_left_indices.is_empty() + { + let mut bitmap = left_data.bitmap().lock(); + bitmap.set_bit(l_index, true); + } + + // TODO(now-perf): better vectorize it + if let Some(bitmap) = self.current_right_batch_matched.as_mut() { + for i in joined_right_indices.iter() { + // After the initial join, indices must all be Some + bitmap.set_bit(i.unwrap() as usize, true); + // println!("Setting bit {i:?} to true"); + } + } + + // For the following join types: here we only have to set the left/right + // bitmap, and no need to output result + if matches!( + self.join_type, + JoinType::LeftAnti + | JoinType::LeftSemi + | JoinType::LeftMark + | JoinType::RightAnti + | JoinType::RightMark + | JoinType::RightSemi + ) { + return Ok(RecordBatch::new_empty(Arc::clone(&self.output_schema))); + } + + // TODO(now): check if we're missed something inside the original + // logic inside `adjust_indices_by_join_types` + + // Build output batch from matching indices + if !joined_left_indices.is_empty() { + let join_batch = build_batch_from_indices_maybe_empty( Review Comment: since we check `joined_left_indices.is_empty`, I think we can use `build_batch_from_indices` directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org