2010YOUY01 commented on code in PR #17482:
URL: https://github.com/apache/datafusion/pull/17482#discussion_r2425364152


##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,

Review Comment:
   I think `ProcessUnmatched` is better, this `ExhaustedStreamSide` looks the 
same as `Completed` state.



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {
+    pub batch: RecordBatch,
+    values: Vec<ArrayRef>,

Review Comment:
   `values` looks too generic, how about `compare_key_values`? Also, why is it 
a `Vec`, it should be only a single value, right?



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {

Review Comment:
   We can rename it to `SortedStreamBatch`



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {

Review Comment:
   ```suggestion
   /// The stream side incoming batch with required sort order.
   /// Note the compare key in the join predicate might include expressions on 
the original
   /// columns, so we store the evaluated compare key separately.
   /// e.g. For join predicate `buffer.v1 < (stream.v1 + 1)`, the `values` 
field stores the evaluted
   /// `stream.v1 + 1` array.
   pub(super) struct StreamedBatch {
   ```



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {
+    pub batch: RecordBatch,
+    values: Vec<ArrayRef>,
+}
+
+impl StreamedBatch {
+    #[allow(dead_code)]
+    fn new(batch: RecordBatch, values: Vec<ArrayRef>) -> Self {
+        Self { batch, values }
+    }
+
+    fn values(&self) -> &Vec<ArrayRef> {
+        &self.values
+    }
+}
+
+pub(super) struct ClassicPWMJStream {
+    // Output schema of the `PiecewiseMergeJoin`
+    pub schema: Arc<Schema>,
+
+    // Physical expression that is evaluated on the streamed side
+    // We do not need on_buffered as this is already evaluated when
+    // creating the buffered side which happens before initializing
+    // `PiecewiseMergeJoinStream`
+    pub on_streamed: PhysicalExprRef,
+    // Type of join
+    pub join_type: JoinType,
+    // Comparison operator
+    pub operator: Operator,
+    // Streamed batch
+    pub streamed: SendableRecordBatchStream,
+    // Streamed schema
+    streamed_schema: SchemaRef,
+    // Buffered side data
+    buffered_side: BufferedSide,
+    // Tracks the state of the `PiecewiseMergeJoin`
+    state: PiecewiseMergeJoinStreamState,
+    // Sort option for buffered and streamed side (specifies whether

Review Comment:
   It should be only for the stream side?



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {
+    pub batch: RecordBatch,
+    values: Vec<ArrayRef>,
+}
+
+impl StreamedBatch {
+    #[allow(dead_code)]
+    fn new(batch: RecordBatch, values: Vec<ArrayRef>) -> Self {
+        Self { batch, values }
+    }
+
+    fn values(&self) -> &Vec<ArrayRef> {
+        &self.values
+    }
+}
+
+pub(super) struct ClassicPWMJStream {
+    // Output schema of the `PiecewiseMergeJoin`
+    pub schema: Arc<Schema>,
+
+    // Physical expression that is evaluated on the streamed side
+    // We do not need on_buffered as this is already evaluated when
+    // creating the buffered side which happens before initializing
+    // `PiecewiseMergeJoinStream`
+    pub on_streamed: PhysicalExprRef,
+    // Type of join
+    pub join_type: JoinType,
+    // Comparison operator
+    pub operator: Operator,
+    // Streamed batch
+    pub streamed: SendableRecordBatchStream,
+    // Streamed schema
+    streamed_schema: SchemaRef,
+    // Buffered side data
+    buffered_side: BufferedSide,
+    // Tracks the state of the `PiecewiseMergeJoin`
+    state: PiecewiseMergeJoinStreamState,
+    // Sort option for buffered and streamed side (specifies whether
+    // the sort is ascending or descending)
+    sort_option: SortOptions,
+    // Metrics for build + probe joins
+    join_metrics: BuildProbeJoinMetrics,
+    // Tracking incremental state for emitting record batches
+    batch_process_state: BatchProcessState,
+    // To synchronize when partition needs to finish
+    remaining_partitions: Arc<AtomicUsize>,

Review Comment:
   I think this field is better to be put inside `BufferedSide`



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {
+    pub batch: RecordBatch,
+    values: Vec<ArrayRef>,
+}
+
+impl StreamedBatch {
+    #[allow(dead_code)]
+    fn new(batch: RecordBatch, values: Vec<ArrayRef>) -> Self {
+        Self { batch, values }
+    }
+
+    fn values(&self) -> &Vec<ArrayRef> {
+        &self.values
+    }
+}
+
+pub(super) struct ClassicPWMJStream {
+    // Output schema of the `PiecewiseMergeJoin`
+    pub schema: Arc<Schema>,
+
+    // Physical expression that is evaluated on the streamed side
+    // We do not need on_buffered as this is already evaluated when
+    // creating the buffered side which happens before initializing
+    // `PiecewiseMergeJoinStream`
+    pub on_streamed: PhysicalExprRef,
+    // Type of join
+    pub join_type: JoinType,
+    // Comparison operator
+    pub operator: Operator,
+    // Streamed batch
+    pub streamed: SendableRecordBatchStream,
+    // Streamed schema
+    streamed_schema: SchemaRef,
+    // Buffered side data
+    buffered_side: BufferedSide,
+    // Tracks the state of the `PiecewiseMergeJoin`
+    state: PiecewiseMergeJoinStreamState,
+    // Sort option for buffered and streamed side (specifies whether
+    // the sort is ascending or descending)
+    sort_option: SortOptions,
+    // Metrics for build + probe joins
+    join_metrics: BuildProbeJoinMetrics,
+    // Tracking incremental state for emitting record batches
+    batch_process_state: BatchProcessState,
+    // To synchronize when partition needs to finish
+    remaining_partitions: Arc<AtomicUsize>,
+}
+
+impl RecordBatchStream for ClassicPWMJStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+// `PiecewiseMergeJoinStreamState` is separated into `WaitBufferedSide`, 
`FetchStreamBatch`,
+// `ProcessStreamBatch`, `ExhaustedStreamSide` and `Completed`.
+//
+// Classic Joins
+//  1. `WaitBufferedSide` - Load in the buffered side data into memory.
+//  2. `FetchStreamBatch` -  Fetch + sort incoming stream batches. We switch 
the state to
+//     `Completed` if there are are still remaining partitions to process. It 
is only switched to
+//     `ExhaustedStreamBatch` if all partitions have been processed.
+//  3. `ProcessStreamBatch` - Compare stream batch row values against the 
buffered side data.
+//  4. `ExhaustedStreamBatch` - If the join type is Left or Inner we will 
return state as
+//      `Completed` however for Full and Right we will need to process the 
unmatched buffered rows.
+impl ClassicPWMJStream {
+    // Creates a new `PiecewiseMergeJoinStream` instance
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        schema: Arc<Schema>,
+        on_streamed: PhysicalExprRef,
+        join_type: JoinType,
+        operator: Operator,
+        streamed: SendableRecordBatchStream,
+        buffered_side: BufferedSide,
+        state: PiecewiseMergeJoinStreamState,
+        sort_option: SortOptions,
+        join_metrics: BuildProbeJoinMetrics,
+        batch_size: usize,
+        remaining_partitions: Arc<AtomicUsize>,
+    ) -> Self {
+        Self {
+            schema: Arc::clone(&schema),
+            on_streamed,
+            join_type,
+            operator,
+            streamed_schema: streamed.schema(),
+            streamed,
+            buffered_side,
+            state,
+            sort_option,
+            join_metrics,
+            batch_process_state: BatchProcessState::new(schema, batch_size),
+            remaining_partitions,
+        }
+    }
+
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return match self.state {
+                PiecewiseMergeJoinStreamState::WaitBufferedSide => {
+                    handle_state!(ready!(self.collect_buffered_side(cx)))
+                }
+                PiecewiseMergeJoinStreamState::FetchStreamBatch => {
+                    handle_state!(ready!(self.fetch_stream_batch(cx)))
+                }
+                PiecewiseMergeJoinStreamState::ProcessStreamBatch(_) => {
+                    handle_state!(self.process_stream_batch())
+                }
+                PiecewiseMergeJoinStreamState::ExhaustedStreamSide => {
+                    handle_state!(self.process_unmatched_buffered_batch())
+                }
+                PiecewiseMergeJoinStreamState::Completed => Poll::Ready(None),
+            };
+        }
+    }
+
+    // Collects buffered side data
+    fn collect_buffered_side(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        let build_timer = self.join_metrics.build_time.timer();
+        let buffered_data = ready!(self
+            .buffered_side
+            .try_as_initial_mut()?
+            .buffered_fut
+            .get_shared(cx))?;
+        build_timer.done();
+
+        // We will start fetching stream batches for classic joins
+        self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+
+        self.buffered_side =
+            BufferedSide::Ready(BufferedSideReadyState { buffered_data });
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Fetches incoming stream batches
+    fn fetch_stream_batch(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.streamed.poll_next_unpin(cx)) {
+            None => {
+                if self
+                    .remaining_partitions
+                    .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
+                    == 1
+                {
+                    self.batch_process_state.reset();
+                    self.state = 
PiecewiseMergeJoinStreamState::ExhaustedStreamSide;
+                } else {
+                    self.state = PiecewiseMergeJoinStreamState::Completed;
+                }
+            }
+            Some(Ok(batch)) => {
+                // Evaluate the streamed physical expression on the stream 
batch
+                let stream_values: ArrayRef = self
+                    .on_streamed
+                    .evaluate(&batch)?
+                    .into_array(batch.num_rows())?;
+
+                self.join_metrics.input_batches.add(1);
+                self.join_metrics.input_rows.add(batch.num_rows());
+
+                // Sort stream values and change the streamed record batch 
accordingly
+                let indices = sort_to_indices(
+                    stream_values.as_ref(),
+                    Some(self.sort_option),
+                    None,
+                )?;
+                let stream_batch = take_record_batch(&batch, &indices)?;
+                let stream_values = take(stream_values.as_ref(), &indices, 
None)?;
+
+                // Reset BatchProcessState before processing a new stream batch
+                self.batch_process_state.reset();
+                self.state =
+                    
PiecewiseMergeJoinStreamState::ProcessStreamBatch(StreamedBatch {
+                        batch: stream_batch,
+                        values: vec![stream_values],
+                    });
+            }
+            Some(Err(err)) => return Poll::Ready(Err(err)),
+        };
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Only classic join will call. This function will process stream batches 
and evaluate against
+    // the buffered side data.
+    fn process_stream_batch(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        let buffered_side = self.buffered_side.try_as_ready_mut()?;
+        let stream_batch = self.state.try_as_process_stream_batch_mut()?;
+
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        // Produce more work
+        let batch = resolve_classic_join(
+            buffered_side,
+            stream_batch,
+            Arc::clone(&self.schema),
+            self.operator,
+            self.sort_option,
+            self.join_type,
+            &mut self.batch_process_state,
+        )?;
+
+        if !self.batch_process_state.continue_process {
+            // We finished scanning this stream batch.
+            self.batch_process_state
+                .output_batches
+                .finish_buffered_batch()?;
+            if let Some(b) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+                return Ok(StatefulStreamResult::Ready(Some(b)));
+            }
+            // Nothing pending; hand back whatever `resolve` returned (often 
empty) and move on.
+            self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        Ok(StatefulStreamResult::Ready(Some(batch)))
+    }
+
+    // Process remaining unmatched rows
+    fn process_unmatched_buffered_batch(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        // Return early for `JoinType::Right` and `JoinType::Inner`
+        if matches!(self.join_type, JoinType::Right | JoinType::Inner) {
+            self.state = PiecewiseMergeJoinStreamState::Completed;
+            return Ok(StatefulStreamResult::Ready(None));
+        }
+
+        if !self.batch_process_state.continue_process {
+            if let Some(batch) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                return Ok(StatefulStreamResult::Ready(Some(batch)));
+            }
+
+            self.batch_process_state
+                .output_batches
+                .finish_buffered_batch()?;
+            if let Some(batch) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                self.state = PiecewiseMergeJoinStreamState::Completed;
+                return Ok(StatefulStreamResult::Ready(Some(batch)));
+            }
+        }
+
+        let buffered_data =
+            
Arc::clone(&self.buffered_side.try_as_ready().unwrap().buffered_data);
+
+        let (buffered_indices, _streamed_indices) = 
get_final_indices_from_shared_bitmap(
+            &buffered_data.visited_indices_bitmap,
+            self.join_type,
+            true,
+        );
+
+        let new_buffered_batch =
+            take_record_batch(buffered_data.batch(), &buffered_indices)?;
+        let mut buffered_columns = new_buffered_batch.columns().to_vec();
+
+        let streamed_columns: Vec<ArrayRef> = self
+            .streamed_schema
+            .fields()
+            .iter()
+            .map(|f| new_null_array(f.data_type(), 
new_buffered_batch.num_rows()))
+            .collect();
+
+        buffered_columns.extend(streamed_columns);
+
+        let batch = RecordBatch::try_new(Arc::clone(&self.schema), 
buffered_columns)?;
+
+        self.batch_process_state.output_batches.push_batch(batch)?;
+
+        self.batch_process_state.continue_process = false;
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        self.batch_process_state
+            .output_batches
+            .finish_buffered_batch()?;
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            self.state = PiecewiseMergeJoinStreamState::Completed;
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        self.state = PiecewiseMergeJoinStreamState::Completed;
+        self.batch_process_state.reset();
+        Ok(StatefulStreamResult::Ready(None))
+    }
+}
+
+struct BatchProcessState {
+    // Used to pick up from the last index on the stream side
+    output_batches: Box<BatchCoalescer>,
+    // Used to store the unmatched stream indices for `JoinType::Right` and 
`JoinType::Full`
+    unmatched_indices: PrimitiveBuilder<UInt32Type>,
+    // Used to store the start index on the buffered side; used to resume 
processing on the correct
+    // row
+    start_buffer_idx: usize,
+    // Used to store the start index on the stream side; used to resume 
processing on the correct
+    // row
+    start_stream_idx: usize,
+    // Signals if we found a match for the current stream row
+    found: bool,
+    // Signals to continue processing the current stream batch
+    continue_process: bool,
+}
+
+impl BatchProcessState {
+    pub(crate) fn new(schema: Arc<Schema>, batch_size: usize) -> Self {
+        Self {
+            output_batches: Box::new(BatchCoalescer::new(schema, batch_size)),
+            unmatched_indices: PrimitiveBuilder::new(),
+            start_buffer_idx: 0,
+            start_stream_idx: 0,
+            found: false,
+            continue_process: true,
+        }
+    }
+
+    pub(crate) fn reset(&mut self) {
+        self.unmatched_indices = PrimitiveBuilder::new();
+        self.start_buffer_idx = 0;
+        self.start_stream_idx = 0;
+        self.found = false;
+        self.continue_process = true;
+    }
+}
+
+impl Stream for ClassicPWMJStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.poll_next_impl(cx)
+    }
+}
+
+// For Left, Right, Full, and Inner joins, incoming stream batches will 
already be sorted.
+#[allow(clippy::too_many_arguments)]
+fn resolve_classic_join(
+    buffered_side: &mut BufferedSideReadyState,
+    stream_batch: &StreamedBatch,
+    join_schema: Arc<Schema>,
+    operator: Operator,
+    sort_options: SortOptions,
+    join_type: JoinType,
+    batch_process_state: &mut BatchProcessState,
+) -> Result<RecordBatch> {
+    let buffered_len = buffered_side.buffered_data.values().len();
+    let stream_values = stream_batch.values();
+
+    let mut buffer_idx = batch_process_state.start_buffer_idx;
+    let stream_idx = batch_process_state.start_stream_idx;
+
+    // Our buffer_idx variable allows us to start probing on the buffered side 
where we last matched
+    // in the previous stream row.
+    for row_idx in stream_idx..stream_batch.batch.num_rows() {
+        while buffer_idx < buffered_len {
+            let compare = {
+                let buffered_values = buffered_side.buffered_data.values();
+                compare_join_arrays(
+                    &[Arc::clone(&stream_values[0])],
+                    row_idx,
+                    &[Arc::clone(buffered_values)],
+                    buffer_idx,
+                    &[sort_options],
+                    NullEquality::NullEqualsNothing,
+                )?
+            };
+
+            // If we find a match we append all indices and move to the next 
stream row index
+            match operator {
+                Operator::Gt | Operator::Lt => {
+                    if matches!(compare, Ordering::Less) {
+                        batch_process_state.found = true;
+                        let count = buffered_len - buffer_idx;
+
+                        let batch = build_matched_indices(
+                            (buffer_idx, count),
+                            (row_idx, count),
+                            buffered_side,
+                            stream_batch,
+                            join_type,
+                            Arc::clone(&join_schema),
+                        )?;
+
+                        batch_process_state.output_batches.push_batch(batch)?;
+
+                        // Flush batch and update pointers if we have a 
completed batch
+                        if let Some(batch) =
+                            
batch_process_state.output_batches.next_completed_batch()
+                        {
+                            batch_process_state.found = false;
+                            batch_process_state.start_buffer_idx = buffer_idx;
+                            batch_process_state.start_stream_idx = row_idx + 1;
+                            return Ok(batch);
+                        }
+
+                        break;
+                    }
+                }
+                Operator::GtEq | Operator::LtEq => {
+                    if matches!(compare, Ordering::Equal | Ordering::Less) {
+                        batch_process_state.found = true;
+                        let count = buffered_len - buffer_idx;
+                        let batch = build_matched_indices(
+                            (buffer_idx, count),
+                            (row_idx, count),
+                            buffered_side,
+                            stream_batch,
+                            join_type,
+                            Arc::clone(&join_schema),
+                        )?;
+
+                        // Flush batch and update pointers if we have a 
completed batch
+                        batch_process_state.output_batches.push_batch(batch)?;
+                        if let Some(batch) =
+                            
batch_process_state.output_batches.next_completed_batch()
+                        {
+                            batch_process_state.found = false;
+                            batch_process_state.start_buffer_idx = buffer_idx;
+                            batch_process_state.start_stream_idx = row_idx + 1;
+                            return Ok(batch);
+                        }
+
+                        break;
+                    }
+                }
+                _ => {
+                    return exec_err!(

Review Comment:
   This looks like a internal error instead



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {
+    pub batch: RecordBatch,
+    values: Vec<ArrayRef>,
+}
+
+impl StreamedBatch {
+    #[allow(dead_code)]
+    fn new(batch: RecordBatch, values: Vec<ArrayRef>) -> Self {
+        Self { batch, values }
+    }
+
+    fn values(&self) -> &Vec<ArrayRef> {
+        &self.values
+    }
+}
+
+pub(super) struct ClassicPWMJStream {
+    // Output schema of the `PiecewiseMergeJoin`
+    pub schema: Arc<Schema>,
+
+    // Physical expression that is evaluated on the streamed side
+    // We do not need on_buffered as this is already evaluated when
+    // creating the buffered side which happens before initializing
+    // `PiecewiseMergeJoinStream`
+    pub on_streamed: PhysicalExprRef,
+    // Type of join
+    pub join_type: JoinType,
+    // Comparison operator
+    pub operator: Operator,
+    // Streamed batch
+    pub streamed: SendableRecordBatchStream,
+    // Streamed schema
+    streamed_schema: SchemaRef,
+    // Buffered side data
+    buffered_side: BufferedSide,
+    // Tracks the state of the `PiecewiseMergeJoin`
+    state: PiecewiseMergeJoinStreamState,
+    // Sort option for buffered and streamed side (specifies whether
+    // the sort is ascending or descending)
+    sort_option: SortOptions,
+    // Metrics for build + probe joins
+    join_metrics: BuildProbeJoinMetrics,
+    // Tracking incremental state for emitting record batches
+    batch_process_state: BatchProcessState,
+    // To synchronize when partition needs to finish
+    remaining_partitions: Arc<AtomicUsize>,
+}
+
+impl RecordBatchStream for ClassicPWMJStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+// `PiecewiseMergeJoinStreamState` is separated into `WaitBufferedSide`, 
`FetchStreamBatch`,
+// `ProcessStreamBatch`, `ExhaustedStreamSide` and `Completed`.
+//
+// Classic Joins
+//  1. `WaitBufferedSide` - Load in the buffered side data into memory.
+//  2. `FetchStreamBatch` -  Fetch + sort incoming stream batches. We switch 
the state to
+//     `Completed` if there are are still remaining partitions to process. It 
is only switched to
+//     `ExhaustedStreamBatch` if all partitions have been processed.
+//  3. `ProcessStreamBatch` - Compare stream batch row values against the 
buffered side data.
+//  4. `ExhaustedStreamBatch` - If the join type is Left or Inner we will 
return state as
+//      `Completed` however for Full and Right we will need to process the 
unmatched buffered rows.
+impl ClassicPWMJStream {
+    // Creates a new `PiecewiseMergeJoinStream` instance
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        schema: Arc<Schema>,
+        on_streamed: PhysicalExprRef,
+        join_type: JoinType,
+        operator: Operator,
+        streamed: SendableRecordBatchStream,
+        buffered_side: BufferedSide,
+        state: PiecewiseMergeJoinStreamState,
+        sort_option: SortOptions,
+        join_metrics: BuildProbeJoinMetrics,
+        batch_size: usize,
+        remaining_partitions: Arc<AtomicUsize>,
+    ) -> Self {
+        Self {
+            schema: Arc::clone(&schema),
+            on_streamed,
+            join_type,
+            operator,
+            streamed_schema: streamed.schema(),
+            streamed,
+            buffered_side,
+            state,
+            sort_option,
+            join_metrics,
+            batch_process_state: BatchProcessState::new(schema, batch_size),
+            remaining_partitions,
+        }
+    }
+
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return match self.state {
+                PiecewiseMergeJoinStreamState::WaitBufferedSide => {
+                    handle_state!(ready!(self.collect_buffered_side(cx)))
+                }
+                PiecewiseMergeJoinStreamState::FetchStreamBatch => {
+                    handle_state!(ready!(self.fetch_stream_batch(cx)))
+                }
+                PiecewiseMergeJoinStreamState::ProcessStreamBatch(_) => {
+                    handle_state!(self.process_stream_batch())
+                }
+                PiecewiseMergeJoinStreamState::ExhaustedStreamSide => {
+                    handle_state!(self.process_unmatched_buffered_batch())
+                }
+                PiecewiseMergeJoinStreamState::Completed => Poll::Ready(None),
+            };
+        }
+    }
+
+    // Collects buffered side data
+    fn collect_buffered_side(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        let build_timer = self.join_metrics.build_time.timer();
+        let buffered_data = ready!(self
+            .buffered_side
+            .try_as_initial_mut()?
+            .buffered_fut
+            .get_shared(cx))?;
+        build_timer.done();
+
+        // We will start fetching stream batches for classic joins
+        self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+
+        self.buffered_side =
+            BufferedSide::Ready(BufferedSideReadyState { buffered_data });
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Fetches incoming stream batches
+    fn fetch_stream_batch(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.streamed.poll_next_unpin(cx)) {
+            None => {
+                if self
+                    .remaining_partitions
+                    .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
+                    == 1
+                {
+                    self.batch_process_state.reset();
+                    self.state = 
PiecewiseMergeJoinStreamState::ExhaustedStreamSide;
+                } else {
+                    self.state = PiecewiseMergeJoinStreamState::Completed;
+                }
+            }
+            Some(Ok(batch)) => {
+                // Evaluate the streamed physical expression on the stream 
batch
+                let stream_values: ArrayRef = self
+                    .on_streamed
+                    .evaluate(&batch)?
+                    .into_array(batch.num_rows())?;
+
+                self.join_metrics.input_batches.add(1);
+                self.join_metrics.input_rows.add(batch.num_rows());
+
+                // Sort stream values and change the streamed record batch 
accordingly
+                let indices = sort_to_indices(
+                    stream_values.as_ref(),
+                    Some(self.sort_option),
+                    None,
+                )?;
+                let stream_batch = take_record_batch(&batch, &indices)?;
+                let stream_values = take(stream_values.as_ref(), &indices, 
None)?;
+
+                // Reset BatchProcessState before processing a new stream batch
+                self.batch_process_state.reset();
+                self.state =
+                    
PiecewiseMergeJoinStreamState::ProcessStreamBatch(StreamedBatch {
+                        batch: stream_batch,
+                        values: vec![stream_values],
+                    });
+            }
+            Some(Err(err)) => return Poll::Ready(Err(err)),
+        };
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Only classic join will call. This function will process stream batches 
and evaluate against
+    // the buffered side data.
+    fn process_stream_batch(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        let buffered_side = self.buffered_side.try_as_ready_mut()?;
+        let stream_batch = self.state.try_as_process_stream_batch_mut()?;
+
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        // Produce more work
+        let batch = resolve_classic_join(
+            buffered_side,
+            stream_batch,
+            Arc::clone(&self.schema),
+            self.operator,
+            self.sort_option,
+            self.join_type,
+            &mut self.batch_process_state,
+        )?;
+
+        if !self.batch_process_state.continue_process {
+            // We finished scanning this stream batch.
+            self.batch_process_state
+                .output_batches
+                .finish_buffered_batch()?;
+            if let Some(b) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+                return Ok(StatefulStreamResult::Ready(Some(b)));
+            }
+            // Nothing pending; hand back whatever `resolve` returned (often 
empty) and move on.
+            self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        Ok(StatefulStreamResult::Ready(Some(batch)))
+    }
+
+    // Process remaining unmatched rows
+    fn process_unmatched_buffered_batch(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        // Return early for `JoinType::Right` and `JoinType::Inner`
+        if matches!(self.join_type, JoinType::Right | JoinType::Inner) {
+            self.state = PiecewiseMergeJoinStreamState::Completed;
+            return Ok(StatefulStreamResult::Ready(None));
+        }
+
+        if !self.batch_process_state.continue_process {
+            if let Some(batch) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                return Ok(StatefulStreamResult::Ready(Some(batch)));
+            }
+
+            self.batch_process_state
+                .output_batches
+                .finish_buffered_batch()?;
+            if let Some(batch) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                self.state = PiecewiseMergeJoinStreamState::Completed;
+                return Ok(StatefulStreamResult::Ready(Some(batch)));
+            }
+        }
+
+        let buffered_data =
+            
Arc::clone(&self.buffered_side.try_as_ready().unwrap().buffered_data);
+
+        let (buffered_indices, _streamed_indices) = 
get_final_indices_from_shared_bitmap(
+            &buffered_data.visited_indices_bitmap,
+            self.join_type,
+            true,
+        );
+
+        let new_buffered_batch =
+            take_record_batch(buffered_data.batch(), &buffered_indices)?;
+        let mut buffered_columns = new_buffered_batch.columns().to_vec();
+
+        let streamed_columns: Vec<ArrayRef> = self
+            .streamed_schema
+            .fields()
+            .iter()
+            .map(|f| new_null_array(f.data_type(), 
new_buffered_batch.num_rows()))
+            .collect();
+
+        buffered_columns.extend(streamed_columns);
+
+        let batch = RecordBatch::try_new(Arc::clone(&self.schema), 
buffered_columns)?;
+
+        self.batch_process_state.output_batches.push_batch(batch)?;
+
+        self.batch_process_state.continue_process = false;
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        self.batch_process_state
+            .output_batches
+            .finish_buffered_batch()?;
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            self.state = PiecewiseMergeJoinStreamState::Completed;
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        self.state = PiecewiseMergeJoinStreamState::Completed;
+        self.batch_process_state.reset();
+        Ok(StatefulStreamResult::Ready(None))
+    }
+}
+
+struct BatchProcessState {
+    // Used to pick up from the last index on the stream side
+    output_batches: Box<BatchCoalescer>,
+    // Used to store the unmatched stream indices for `JoinType::Right` and 
`JoinType::Full`
+    unmatched_indices: PrimitiveBuilder<UInt32Type>,
+    // Used to store the start index on the buffered side; used to resume 
processing on the correct
+    // row
+    start_buffer_idx: usize,
+    // Used to store the start index on the stream side; used to resume 
processing on the correct
+    // row
+    start_stream_idx: usize,
+    // Signals if we found a match for the current stream row
+    found: bool,
+    // Signals to continue processing the current stream batch
+    continue_process: bool,
+}
+
+impl BatchProcessState {
+    pub(crate) fn new(schema: Arc<Schema>, batch_size: usize) -> Self {
+        Self {
+            output_batches: Box::new(BatchCoalescer::new(schema, batch_size)),
+            unmatched_indices: PrimitiveBuilder::new(),
+            start_buffer_idx: 0,
+            start_stream_idx: 0,
+            found: false,
+            continue_process: true,
+        }
+    }
+
+    pub(crate) fn reset(&mut self) {
+        self.unmatched_indices = PrimitiveBuilder::new();
+        self.start_buffer_idx = 0;
+        self.start_stream_idx = 0;
+        self.found = false;
+        self.continue_process = true;
+    }
+}
+
+impl Stream for ClassicPWMJStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.poll_next_impl(cx)
+    }
+}
+
+// For Left, Right, Full, and Inner joins, incoming stream batches will 
already be sorted.
+#[allow(clippy::too_many_arguments)]
+fn resolve_classic_join(
+    buffered_side: &mut BufferedSideReadyState,
+    stream_batch: &StreamedBatch,
+    join_schema: Arc<Schema>,
+    operator: Operator,
+    sort_options: SortOptions,
+    join_type: JoinType,
+    batch_process_state: &mut BatchProcessState,
+) -> Result<RecordBatch> {
+    let buffered_len = buffered_side.buffered_data.values().len();
+    let stream_values = stream_batch.values();
+
+    let mut buffer_idx = batch_process_state.start_buffer_idx;
+    let stream_idx = batch_process_state.start_stream_idx;
+
+    // Our buffer_idx variable allows us to start probing on the buffered side 
where we last matched
+    // in the previous stream row.
+    for row_idx in stream_idx..stream_batch.batch.num_rows() {
+        while buffer_idx < buffered_len {
+            let compare = {
+                let buffered_values = buffered_side.buffered_data.values();
+                compare_join_arrays(
+                    &[Arc::clone(&stream_values[0])],
+                    row_idx,
+                    &[Arc::clone(buffered_values)],
+                    buffer_idx,
+                    &[sort_options],
+                    NullEquality::NullEqualsNothing,
+                )?
+            };
+
+            // If we find a match we append all indices and move to the next 
stream row index
+            match operator {
+                Operator::Gt | Operator::Lt => {
+                    if matches!(compare, Ordering::Less) {
+                        batch_process_state.found = true;
+                        let count = buffered_len - buffer_idx;
+
+                        let batch = build_matched_indices(
+                            (buffer_idx, count),
+                            (row_idx, count),
+                            buffered_side,
+                            stream_batch,
+                            join_type,
+                            Arc::clone(&join_schema),
+                        )?;
+
+                        batch_process_state.output_batches.push_batch(batch)?;
+
+                        // Flush batch and update pointers if we have a 
completed batch
+                        if let Some(batch) =
+                            
batch_process_state.output_batches.next_completed_batch()
+                        {
+                            batch_process_state.found = false;
+                            batch_process_state.start_buffer_idx = buffer_idx;
+                            batch_process_state.start_stream_idx = row_idx + 1;
+                            return Ok(batch);
+                        }
+
+                        break;
+                    }
+                }
+                Operator::GtEq | Operator::LtEq => {
+                    if matches!(compare, Ordering::Equal | Ordering::Less) {
+                        batch_process_state.found = true;
+                        let count = buffered_len - buffer_idx;
+                        let batch = build_matched_indices(
+                            (buffer_idx, count),
+                            (row_idx, count),
+                            buffered_side,
+                            stream_batch,
+                            join_type,
+                            Arc::clone(&join_schema),
+                        )?;
+
+                        // Flush batch and update pointers if we have a 
completed batch
+                        batch_process_state.output_batches.push_batch(batch)?;
+                        if let Some(batch) =
+                            
batch_process_state.output_batches.next_completed_batch()
+                        {
+                            batch_process_state.found = false;
+                            batch_process_state.start_buffer_idx = buffer_idx;
+                            batch_process_state.start_stream_idx = row_idx + 1;
+                            return Ok(batch);
+                        }
+
+                        break;
+                    }
+                }
+                _ => {
+                    return exec_err!(
+                        "PiecewiseMergeJoin should not contain operator, {}",
+                        operator
+                    )
+                }
+            };
+
+            // Increment buffer_idx after every row
+            buffer_idx += 1;
+        }
+
+        // If a match was not found for the current stream row index the 
stream indice is appended
+        // to the unmatched indices to be flushed later.
+        if matches!(join_type, JoinType::Right | JoinType::Full)
+            && !batch_process_state.found
+        {
+            batch_process_state
+                .unmatched_indices
+                .append_value(row_idx as u32);
+        }
+
+        batch_process_state.found = false;
+    }
+
+    // Flushed all unmatched indices on the streamed side
+    if matches!(join_type, JoinType::Right | JoinType::Full) {
+        let batch = create_unmatched_batch(
+            &mut batch_process_state.unmatched_indices,
+            stream_batch,
+            Arc::clone(&join_schema),
+        )?;
+
+        batch_process_state.output_batches.push_batch(batch)?;
+    }
+
+    batch_process_state.continue_process = false;
+    Ok(RecordBatch::new_empty(Arc::clone(&join_schema)))
+}
+
+// Builds a record batch from indices ranges on the buffered and streamed side.
+//
+// The two ranges are: buffered_range: (start index, count) and 
streamed_range: (start index, count) due
+// to batch.slice(start, count).
+fn build_matched_indices(

Review Comment:
   It can use the name `build_matched_and_set_buffered_bitmap` to be more 
descriptive.



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs:
##########
@@ -0,0 +1,735 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::Array;
+use arrow::{
+    array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
+    compute::concat_batches,
+    util::bit_util,
+};
+use arrow_schema::{SchemaRef, SortOptions};
+use datafusion_common::not_impl_err;
+use datafusion_common::{internal_err, JoinSide, Result};
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    SendableRecordBatchStream,
+};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::equivalence::join_equivalence_properties;
+use datafusion_physical_expr::{
+    Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, 
PhysicalExprRef,
+    PhysicalSortExpr,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use futures::TryStreamExt;
+use parking_lot::Mutex;
+use std::fmt::Formatter;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+use crate::execution_plan::{boundedness_from_children, EmissionType};
+
+use crate::joins::piecewise_merge_join::classic_join::{
+    ClassicPWMJStream, PiecewiseMergeJoinStreamState,
+};
+use crate::joins::piecewise_merge_join::utils::{
+    build_visited_indices_map, is_existence_join, is_right_existence_join,
+};
+use crate::joins::utils::asymmetric_join_output_partitioning;
+use crate::{
+    joins::{
+        utils::{build_join_schema, BuildProbeJoinMetrics, OnceAsync, OnceFut},
+        SharedBitmapBuilder,
+    },
+    metrics::ExecutionPlanMetricsSet,
+    spill::get_record_batch_memory_size,
+    ExecutionPlan, PlanProperties,
+};
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
+
+/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates 
single range filter and show much
+/// better performance for these workloads than `NestedLoopJoin`
+///
+/// The physical planner will choose to evaluate this join when there is only 
one comparison filter. This
+/// is a binary expression which contains [`Operator::Lt`], 
[`Operator::LtEq`], [`Operator::Gt`], and
+/// [`Operator::GtEq`].:
+/// Examples:
+///  - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
+///
+/// # Execution Plan Inputs
+/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side 
and the left outputs as the
+/// 'buffered' side.
+///
+/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and 
is able to sort streamed record
+/// batches during processing. Sorted input must specifically be 
ascending/descending based on the operator.
+///
+/// # Algorithms
+/// Classic joins are processed differently compared to existence joins.
+///
+/// ## Classic Joins (Inner, Full, Left, Right)
+/// For classic joins we buffer the right side (the "build" side) and stream 
the left side (the "probe" side).
+/// Both sides are sorted so that we can iterate from index 0 to the end on 
each side.  This ordering ensures
+/// that when we find the first matching pair of rows, we can emit the current 
left row joined with all remaining
+/// right rows from the match position onward, without rescanning earlier 
right rows.
+///  
+/// For `<` and `<=` operators, both inputs are sorted in **descending** 
order, while for `>` and `>=` operators
+/// they are sorted in **ascending** order. This choice ensures that the 
pointer on the buffered side can advance
+/// monotonically as we stream new batches from the left side.
+///
+/// The streamed (left) side may arrive unsorted, so this operator sorts each 
incoming batch in memory before
+/// processing. The buffered (right) side is required to be globally sorted; 
the plan declares this requirement
+/// in `requires_input_order`, which allows the optimizer to automatically 
insert a `SortExec` on that side if needed.
+/// By the time this operator runs, the right side is guaranteed to be in the 
proper order.
+///
+/// The pseudocode for the algorithm looks like this:
+///
+/// ```text
+/// for stream_row in stream_batch:
+///     for buffer_row in buffer_batch:
+///         if compare(stream_row, probe_row):
+///             output stream_row X buffer_batch[buffer_row:]
+///         else:
+///             continue
+/// ```
+///
+/// The algorithm uses the streamed side (larger) to drive the loop. This is 
due to every row on the stream side iterating
+/// the buffered side to find every first match. By doing this, each match can 
output more result so that output
+/// handling can be better vectorized for performance.
+///
+/// Here is an example:
+///
+/// We perform a `JoinType::Left` with these two batches and the operator 
being `Operator::Lt`(<). For each
+/// row on the streamed side we move a pointer on the buffered until it 
matches the condition. Once we reach
+/// the row which matches (in this case with row 1 on streamed will have its 
first match on row 2 on
+/// buffered; 100 < 200 is true), we can emit all rows after that match. We 
can emit the rows like this because
+/// if the batch is sorted in ascending order, every subsequent row will also 
satisfy the condition as they will
+/// all be larger values.
+///
+/// ```text
+/// SQL statement:
+/// SELECT *
+/// FROM (VALUES (100), (200), (500)) AS streamed(a)
+/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
+///   ON streamed.a < buffered.b;
+///
+/// Processing Row 1:
+///
+///       Sorted Buffered Side                                         Sorted 
Streamed Side          
+///       ┌──────────────────┐                                         
┌──────────────────┐         
+///     1 │       100        │                                       1 │       
100        │        
+///       ├──────────────────┤                                         
├──────────────────┤         
+///     2 │       200        │ ─┐                                    2 │       
200        │        
+///       ├──────────────────┤  │  For row 1 on streamed side with     
├──────────────────┤         
+///     3 │       200        │  │  value 100, we emit rows 2 - 5.    3 │       
500        │       
+///       ├──────────────────┤  │  as matches when the operator is     
└──────────────────┘
+///     4 │       300        │  │  `Operator::Lt` (<) Emitting all
+///       ├──────────────────┤  │  rows after the first match (row
+///     5 │       400        │ ─┘  2 buffered side; 100 < 200)
+///       └──────────────────┘     
+///
+/// Processing Row 2:
+///   By sorting the streamed side we know
+///
+///       Sorted Buffered Side                                         Sorted 
Streamed Side          
+///       ┌──────────────────┐                                         
┌──────────────────┐         
+///     1 │       100        │                                       1 │       
100        │        
+///       ├──────────────────┤                                         
├──────────────────┤         
+///     2 │       200        │ <- Start here when probing for the    2 │       
200        │        
+///       ├──────────────────┤    streamed side row 2.                 
├──────────────────┤         
+///     3 │       200        │                                       3 │       
500        │       
+///       ├──────────────────┤                                         
└──────────────────┘
+///     4 │       300        │  
+///       ├──────────────────┤  
+///     5 │       400        │
+///       └──────────────────┘     
+///
+/// ```
+///
+/// ## Existence Joins (Semi, Anti, Mark)
+/// Existence joins are made magnitudes of times faster with a 
`PiecewiseMergeJoin` as we only need to find
+/// the min/max value of the streamed side to be able to emit all matches on 
the buffered side. By putting
+/// the side we need to mark onto the sorted buffer side, we can emit all 
these matches at once.
+///
+/// For less than operations (`<`) both inputs are to be sorted in descending 
order and vice versa for greater
+/// than (`>`) operations. `SortExec` is used to enforce sorting on the 
buffered side and streamed side does not
+/// need to be sorted due to only needing to find the min/max.
+///
+/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked 
side is on the buffered side.
+///
+/// The pseudocode for the algorithm looks like this:
+///
+/// ```text
+/// // Using the example of a less than `<` operation
+/// let max = max_batch(streamed_batch)
+///
+/// for buffer_row in buffer_batch:
+///     if buffer_row < max:
+///         output buffer_batch[buffer_row:]
+/// ```
+///
+/// Only need to find the min/max value and iterate through the buffered side 
once.
+///
+/// Here is an example:
+/// We perform a `JoinType::LeftSemi` with these two batches and the operator 
being `Operator::Lt`(<). Because
+/// the operator is `Operator::Lt` we can find the minimum value in the 
streamed side; in this case it is 200.
+/// We can then advance a pointer from the start of the buffer side until we 
find the first value that satisfies
+/// the predicate. All rows after that first matched value satisfy the 
condition 200 < x so we can mark all of
+/// those rows as matched.
+///
+/// ```text
+/// SQL statement:
+/// SELECT *
+/// FROM (VALUES (500), (200), (300)) AS streamed(a)
+/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
+///   ON streamed.a < buffered.b;
+///
+///          Sorted Buffered Side             Unsorted Streamed Side
+///            ┌──────────────────┐          ┌──────────────────┐
+///          1 │       100        │        1 │       500        │
+///            ├──────────────────┤          ├──────────────────┤
+///          2 │       200        │        2 │       200        │
+///            ├──────────────────┤          ├──────────────────┤    
+///          3 │       200        │        3 │       300        │
+///            ├──────────────────┤          └──────────────────┘
+///          4 │       300        │ ─┐       
+///            ├──────────────────┤  | We emit matches for row 4 - 5
+///          5 │       400        │ ─┘ on the buffered side.
+///            └──────────────────┘
+///             min value: 200
+/// ```
+///
+/// For both types of joins, the buffered side must be sorted ascending for 
`Operator::Lt` (<) or
+/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or 
`Operator::GtEq` (>=).
+///
+/// # Performance Explanation (cost)
+/// Piecewise Merge Join is used over Nested Loop Join due to its superior 
performance. Here is the breakdown:
+///
+/// R: Buffered Side
+/// S: Streamed Side
+///
+/// ## Piecewise Merge Join (PWMJ)
+///
+/// # Classic Join:
+/// Requires sorting the probe side and, for each probe row, scanning the 
buffered side until the first match
+/// is found.
+///     Complexity: `O(sort(S) + num_of_batches(|S|) * scan(R))`.
+///
+/// # Mark Join:
+/// Sorts the probe side, then computes the min/max range of the probe keys 
and scans the buffered side only
+/// within that range.  
+///   Complexity: `O(|S| + scan(R[range]))`.
+///
+/// ## Nested Loop Join
+/// Compares every row from `S` with every row from `R`.  
+///   Complexity: `O(|S| * |R|)`.
+///
+/// ## Nested Loop Join
+///   Always going to be probe (O(S) * O(R)).
+///
+/// # Further Reference Material
+/// DuckDB blog on Range Joins: [Range Joins in 
DuckDB](https://duckdb.org/2022/05/27/iejoin.html)
+#[derive(Debug)]
+pub struct PiecewiseMergeJoinExec {
+    /// Left buffered execution plan
+    pub buffered: Arc<dyn ExecutionPlan>,
+    /// Right streamed execution plan
+    pub streamed: Arc<dyn ExecutionPlan>,
+    /// The two expressions being compared
+    pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
+    /// Comparison operator in the range predicate
+    pub operator: Operator,
+    /// How the join is performed
+    pub join_type: JoinType,
+    /// The schema once the join is applied
+    schema: SchemaRef,
+    /// Buffered data
+    buffered_fut: OnceAsync<BufferedSideData>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+
+    /// Sort expressions - See above for more details 
[`PiecewiseMergeJoinExec`]
+    ///
+    /// The left sort order, descending for `<`, `<=` operations + ascending 
for `>`, `>=` operations
+    left_child_plan_required_order: LexOrdering,
+    /// The right sort order, descending for `<`, `<=` operations + ascending 
for `>`, `>=` operations
+    /// Unsorted for mark joins
+    #[allow(unused)]
+    right_batch_required_orders: LexOrdering,
+
+    /// This determines the sort order of all join columns used in sorting the 
stream and buffered execution plans.
+    sort_options: SortOptions,
+    /// Cache holding plan properties like equivalences, output partitioning 
etc.
+    cache: PlanProperties,
+    /// Number of partitions to process
+    remaining_partitions: Arc<AtomicUsize>,
+}
+
+impl PiecewiseMergeJoinExec {
+    pub fn try_new(
+        buffered: Arc<dyn ExecutionPlan>,
+        streamed: Arc<dyn ExecutionPlan>,
+        on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
+        operator: Operator,
+        join_type: JoinType,
+        num_partitions: Arc<AtomicUsize>,
+    ) -> Result<Self> {
+        // TODO: Implement existence joins for PiecewiseMergeJoin
+        if is_existence_join(join_type) {
+            return not_impl_err!(
+                "Existence Joins are currently not supported for 
PiecewiseMergeJoin"
+            );
+        }
+
+        // Take the operator and enforce a sort order on the streamed + 
buffered side based on
+        // the operator type.
+        let sort_options = match operator {
+            Operator::Lt | Operator::LtEq => {
+                // For left existence joins the inputs will be swapped so the 
sort
+                // options are switched
+                if is_right_existence_join(join_type) {
+                    SortOptions::new(false, false)
+                } else {
+                    SortOptions::new(true, false)
+                }
+            }
+            Operator::Gt | Operator::GtEq => {
+                if is_right_existence_join(join_type) {
+                    SortOptions::new(true, false)
+                } else {
+                    SortOptions::new(false, false)
+                }
+            }
+            _ => {
+                return internal_err!(
+                    "Cannot contain non-range operator in 
PiecewiseMergeJoinExec"
+                )
+            }
+        };
+
+        // Give the same `sort_option for comparison later`
+        let left_child_plan_required_order =
+            vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
+        let right_batch_required_orders =
+            vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
+
+        let Some(left_child_plan_required_order) =
+            LexOrdering::new(left_child_plan_required_order)
+        else {
+            return internal_err!(
+                "PiecewiseMergeJoinExec requires valid sort expressions for 
its left side"
+            );
+        };
+        let Some(right_batch_required_orders) =
+            LexOrdering::new(right_batch_required_orders)
+        else {
+            return internal_err!(
+                "PiecewiseMergeJoinExec requires valid sort expressions for 
its right side"
+            );
+        };
+
+        let buffered_schema = buffered.schema();
+        let streamed_schema = streamed.schema();
+
+        // Create output schema for the join
+        let schema =
+            Arc::new(build_join_schema(&buffered_schema, &streamed_schema, 
&join_type).0);
+        let cache = Self::compute_properties(
+            &buffered,
+            &streamed,
+            Arc::clone(&schema),
+            join_type,
+            &on,
+        )?;
+
+        Ok(Self {
+            streamed,
+            buffered,
+            on,
+            operator,
+            join_type,
+            schema,
+            buffered_fut: Default::default(),
+            metrics: ExecutionPlanMetricsSet::new(),
+            left_child_plan_required_order,
+            right_batch_required_orders,
+            sort_options,
+            cache,
+            remaining_partitions: num_partitions,
+        })
+    }
+
+    /// Reference to buffered side execution plan
+    pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.buffered
+    }
+
+    /// Reference to streamed side execution plan
+    pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.streamed
+    }
+
+    /// Join type
+    pub fn join_type(&self) -> JoinType {
+        self.join_type
+    }
+
+    /// Reference to sort options
+    pub fn sort_options(&self) -> &SortOptions {
+        &self.sort_options
+    }
+
+    /// Get probe side (streamed side) for the PiecewiseMergeJoin
+    /// In current implementation, probe side is determined according to join 
type.
+    pub fn probe_side(join_type: &JoinType) -> JoinSide {
+        match join_type {
+            JoinType::Right
+            | JoinType::Inner
+            | JoinType::Full
+            | JoinType::RightSemi
+            | JoinType::RightAnti
+            | JoinType::RightMark => JoinSide::Right,
+            JoinType::Left
+            | JoinType::LeftAnti
+            | JoinType::LeftSemi
+            | JoinType::LeftMark => JoinSide::Left,
+        }
+    }
+
+    pub fn compute_properties(
+        buffered: &Arc<dyn ExecutionPlan>,
+        streamed: &Arc<dyn ExecutionPlan>,
+        schema: SchemaRef,
+        join_type: JoinType,
+        join_on: &(PhysicalExprRef, PhysicalExprRef),
+    ) -> Result<PlanProperties> {
+        let eq_properties = join_equivalence_properties(
+            buffered.equivalence_properties().clone(),
+            streamed.equivalence_properties().clone(),
+            &join_type,
+            schema,
+            &Self::maintains_input_order(join_type),
+            Some(Self::probe_side(&join_type)),
+            std::slice::from_ref(join_on),
+        )?;
+
+        let output_partitioning =
+            asymmetric_join_output_partitioning(buffered, streamed, 
&join_type)?;
+
+        Ok(PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            EmissionType::Incremental,
+            boundedness_from_children([buffered, streamed]),
+        ))
+    }
+
+    // TODO: Add input order

Review Comment:
   ```suggestion
       // TODO: Add input order. Now they're all `false` indicating it will not 
maintain the input order.
       // However, for certain join types the order is maintained. This can be 
updated in the future after
       // more testing.
   ```



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {
+    pub batch: RecordBatch,
+    values: Vec<ArrayRef>,
+}
+
+impl StreamedBatch {
+    #[allow(dead_code)]
+    fn new(batch: RecordBatch, values: Vec<ArrayRef>) -> Self {
+        Self { batch, values }
+    }
+
+    fn values(&self) -> &Vec<ArrayRef> {
+        &self.values
+    }
+}
+
+pub(super) struct ClassicPWMJStream {
+    // Output schema of the `PiecewiseMergeJoin`
+    pub schema: Arc<Schema>,
+
+    // Physical expression that is evaluated on the streamed side
+    // We do not need on_buffered as this is already evaluated when
+    // creating the buffered side which happens before initializing
+    // `PiecewiseMergeJoinStream`
+    pub on_streamed: PhysicalExprRef,
+    // Type of join
+    pub join_type: JoinType,
+    // Comparison operator
+    pub operator: Operator,
+    // Streamed batch
+    pub streamed: SendableRecordBatchStream,
+    // Streamed schema
+    streamed_schema: SchemaRef,
+    // Buffered side data
+    buffered_side: BufferedSide,
+    // Tracks the state of the `PiecewiseMergeJoin`
+    state: PiecewiseMergeJoinStreamState,
+    // Sort option for buffered and streamed side (specifies whether
+    // the sort is ascending or descending)
+    sort_option: SortOptions,
+    // Metrics for build + probe joins
+    join_metrics: BuildProbeJoinMetrics,
+    // Tracking incremental state for emitting record batches
+    batch_process_state: BatchProcessState,
+    // To synchronize when partition needs to finish
+    remaining_partitions: Arc<AtomicUsize>,
+}
+
+impl RecordBatchStream for ClassicPWMJStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+// `PiecewiseMergeJoinStreamState` is separated into `WaitBufferedSide`, 
`FetchStreamBatch`,
+// `ProcessStreamBatch`, `ExhaustedStreamSide` and `Completed`.
+//
+// Classic Joins
+//  1. `WaitBufferedSide` - Load in the buffered side data into memory.
+//  2. `FetchStreamBatch` -  Fetch + sort incoming stream batches. We switch 
the state to
+//     `Completed` if there are are still remaining partitions to process. It 
is only switched to
+//     `ExhaustedStreamBatch` if all partitions have been processed.
+//  3. `ProcessStreamBatch` - Compare stream batch row values against the 
buffered side data.
+//  4. `ExhaustedStreamBatch` - If the join type is Left or Inner we will 
return state as
+//      `Completed` however for Full and Right we will need to process the 
unmatched buffered rows.
+impl ClassicPWMJStream {
+    // Creates a new `PiecewiseMergeJoinStream` instance
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        schema: Arc<Schema>,
+        on_streamed: PhysicalExprRef,
+        join_type: JoinType,
+        operator: Operator,
+        streamed: SendableRecordBatchStream,
+        buffered_side: BufferedSide,
+        state: PiecewiseMergeJoinStreamState,
+        sort_option: SortOptions,
+        join_metrics: BuildProbeJoinMetrics,
+        batch_size: usize,
+        remaining_partitions: Arc<AtomicUsize>,
+    ) -> Self {
+        Self {
+            schema: Arc::clone(&schema),
+            on_streamed,
+            join_type,
+            operator,
+            streamed_schema: streamed.schema(),
+            streamed,
+            buffered_side,
+            state,
+            sort_option,
+            join_metrics,
+            batch_process_state: BatchProcessState::new(schema, batch_size),
+            remaining_partitions,
+        }
+    }
+
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return match self.state {
+                PiecewiseMergeJoinStreamState::WaitBufferedSide => {
+                    handle_state!(ready!(self.collect_buffered_side(cx)))
+                }
+                PiecewiseMergeJoinStreamState::FetchStreamBatch => {
+                    handle_state!(ready!(self.fetch_stream_batch(cx)))
+                }
+                PiecewiseMergeJoinStreamState::ProcessStreamBatch(_) => {
+                    handle_state!(self.process_stream_batch())
+                }
+                PiecewiseMergeJoinStreamState::ExhaustedStreamSide => {
+                    handle_state!(self.process_unmatched_buffered_batch())
+                }
+                PiecewiseMergeJoinStreamState::Completed => Poll::Ready(None),
+            };
+        }
+    }
+
+    // Collects buffered side data
+    fn collect_buffered_side(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        let build_timer = self.join_metrics.build_time.timer();
+        let buffered_data = ready!(self
+            .buffered_side
+            .try_as_initial_mut()?
+            .buffered_fut
+            .get_shared(cx))?;
+        build_timer.done();
+
+        // We will start fetching stream batches for classic joins
+        self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+
+        self.buffered_side =
+            BufferedSide::Ready(BufferedSideReadyState { buffered_data });
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Fetches incoming stream batches
+    fn fetch_stream_batch(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.streamed.poll_next_unpin(cx)) {
+            None => {
+                if self
+                    .remaining_partitions
+                    .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
+                    == 1
+                {
+                    self.batch_process_state.reset();
+                    self.state = 
PiecewiseMergeJoinStreamState::ExhaustedStreamSide;
+                } else {
+                    self.state = PiecewiseMergeJoinStreamState::Completed;
+                }
+            }
+            Some(Ok(batch)) => {
+                // Evaluate the streamed physical expression on the stream 
batch
+                let stream_values: ArrayRef = self
+                    .on_streamed
+                    .evaluate(&batch)?
+                    .into_array(batch.num_rows())?;
+
+                self.join_metrics.input_batches.add(1);
+                self.join_metrics.input_rows.add(batch.num_rows());
+
+                // Sort stream values and change the streamed record batch 
accordingly
+                let indices = sort_to_indices(
+                    stream_values.as_ref(),
+                    Some(self.sort_option),
+                    None,
+                )?;
+                let stream_batch = take_record_batch(&batch, &indices)?;
+                let stream_values = take(stream_values.as_ref(), &indices, 
None)?;
+
+                // Reset BatchProcessState before processing a new stream batch
+                self.batch_process_state.reset();
+                self.state =
+                    
PiecewiseMergeJoinStreamState::ProcessStreamBatch(StreamedBatch {
+                        batch: stream_batch,
+                        values: vec![stream_values],
+                    });
+            }
+            Some(Err(err)) => return Poll::Ready(Err(err)),
+        };
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Only classic join will call. This function will process stream batches 
and evaluate against
+    // the buffered side data.
+    fn process_stream_batch(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        let buffered_side = self.buffered_side.try_as_ready_mut()?;
+        let stream_batch = self.state.try_as_process_stream_batch_mut()?;
+
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        // Produce more work
+        let batch = resolve_classic_join(
+            buffered_side,
+            stream_batch,
+            Arc::clone(&self.schema),
+            self.operator,
+            self.sort_option,
+            self.join_type,
+            &mut self.batch_process_state,
+        )?;
+
+        if !self.batch_process_state.continue_process {
+            // We finished scanning this stream batch.
+            self.batch_process_state
+                .output_batches
+                .finish_buffered_batch()?;
+            if let Some(b) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+                return Ok(StatefulStreamResult::Ready(Some(b)));
+            }
+            // Nothing pending; hand back whatever `resolve` returned (often 
empty) and move on.
+            self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        Ok(StatefulStreamResult::Ready(Some(batch)))
+    }
+
+    // Process remaining unmatched rows
+    fn process_unmatched_buffered_batch(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        // Return early for `JoinType::Right` and `JoinType::Inner`
+        if matches!(self.join_type, JoinType::Right | JoinType::Inner) {
+            self.state = PiecewiseMergeJoinStreamState::Completed;
+            return Ok(StatefulStreamResult::Ready(None));
+        }
+
+        if !self.batch_process_state.continue_process {
+            if let Some(batch) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                return Ok(StatefulStreamResult::Ready(Some(batch)));
+            }
+
+            self.batch_process_state
+                .output_batches
+                .finish_buffered_batch()?;
+            if let Some(batch) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                self.state = PiecewiseMergeJoinStreamState::Completed;
+                return Ok(StatefulStreamResult::Ready(Some(batch)));
+            }
+        }
+
+        let buffered_data =
+            
Arc::clone(&self.buffered_side.try_as_ready().unwrap().buffered_data);
+
+        let (buffered_indices, _streamed_indices) = 
get_final_indices_from_shared_bitmap(
+            &buffered_data.visited_indices_bitmap,
+            self.join_type,
+            true,
+        );
+
+        let new_buffered_batch =
+            take_record_batch(buffered_data.batch(), &buffered_indices)?;
+        let mut buffered_columns = new_buffered_batch.columns().to_vec();
+
+        let streamed_columns: Vec<ArrayRef> = self
+            .streamed_schema
+            .fields()
+            .iter()
+            .map(|f| new_null_array(f.data_type(), 
new_buffered_batch.num_rows()))
+            .collect();
+
+        buffered_columns.extend(streamed_columns);
+
+        let batch = RecordBatch::try_new(Arc::clone(&self.schema), 
buffered_columns)?;
+
+        self.batch_process_state.output_batches.push_batch(batch)?;
+
+        self.batch_process_state.continue_process = false;
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        self.batch_process_state
+            .output_batches
+            .finish_buffered_batch()?;
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            self.state = PiecewiseMergeJoinStreamState::Completed;
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        self.state = PiecewiseMergeJoinStreamState::Completed;
+        self.batch_process_state.reset();
+        Ok(StatefulStreamResult::Ready(None))
+    }
+}
+
+struct BatchProcessState {
+    // Used to pick up from the last index on the stream side
+    output_batches: Box<BatchCoalescer>,
+    // Used to store the unmatched stream indices for `JoinType::Right` and 
`JoinType::Full`
+    unmatched_indices: PrimitiveBuilder<UInt32Type>,
+    // Used to store the start index on the buffered side; used to resume 
processing on the correct
+    // row
+    start_buffer_idx: usize,
+    // Used to store the start index on the stream side; used to resume 
processing on the correct
+    // row
+    start_stream_idx: usize,
+    // Signals if we found a match for the current stream row
+    found: bool,
+    // Signals to continue processing the current stream batch
+    continue_process: bool,
+}
+
+impl BatchProcessState {
+    pub(crate) fn new(schema: Arc<Schema>, batch_size: usize) -> Self {
+        Self {
+            output_batches: Box::new(BatchCoalescer::new(schema, batch_size)),
+            unmatched_indices: PrimitiveBuilder::new(),
+            start_buffer_idx: 0,
+            start_stream_idx: 0,
+            found: false,
+            continue_process: true,
+        }
+    }
+
+    pub(crate) fn reset(&mut self) {

Review Comment:
   should we add some sanity check here? like `BatchCoalescer` must be empty 
when calling reset



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs:
##########
@@ -0,0 +1,735 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::Array;
+use arrow::{
+    array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
+    compute::concat_batches,
+    util::bit_util,
+};
+use arrow_schema::{SchemaRef, SortOptions};
+use datafusion_common::not_impl_err;
+use datafusion_common::{internal_err, JoinSide, Result};
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    SendableRecordBatchStream,
+};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::equivalence::join_equivalence_properties;
+use datafusion_physical_expr::{
+    Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, 
PhysicalExprRef,
+    PhysicalSortExpr,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use futures::TryStreamExt;
+use parking_lot::Mutex;
+use std::fmt::Formatter;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+use crate::execution_plan::{boundedness_from_children, EmissionType};
+
+use crate::joins::piecewise_merge_join::classic_join::{
+    ClassicPWMJStream, PiecewiseMergeJoinStreamState,
+};
+use crate::joins::piecewise_merge_join::utils::{
+    build_visited_indices_map, is_existence_join, is_right_existence_join,
+};
+use crate::joins::utils::asymmetric_join_output_partitioning;
+use crate::{
+    joins::{
+        utils::{build_join_schema, BuildProbeJoinMetrics, OnceAsync, OnceFut},
+        SharedBitmapBuilder,
+    },
+    metrics::ExecutionPlanMetricsSet,
+    spill::get_record_batch_memory_size,
+    ExecutionPlan, PlanProperties,
+};
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
+
+/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates 
single range filter and show much
+/// better performance for these workloads than `NestedLoopJoin`
+///
+/// The physical planner will choose to evaluate this join when there is only 
one comparison filter. This
+/// is a binary expression which contains [`Operator::Lt`], 
[`Operator::LtEq`], [`Operator::Gt`], and
+/// [`Operator::GtEq`].:
+/// Examples:
+///  - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
+///
+/// # Execution Plan Inputs
+/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side 
and the left outputs as the
+/// 'buffered' side.
+///
+/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and 
is able to sort streamed record
+/// batches during processing. Sorted input must specifically be 
ascending/descending based on the operator.
+///
+/// # Algorithms
+/// Classic joins are processed differently compared to existence joins.
+///
+/// ## Classic Joins (Inner, Full, Left, Right)
+/// For classic joins we buffer the right side (the "build" side) and stream 
the left side (the "probe" side).

Review Comment:
   Here it's referencing the right side as the build side, and the 
implementation is doing the opposite. Perhaps we can remove left/right here and 
only use buffer/stream to avoid confusion?



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs:
##########
@@ -0,0 +1,735 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::Array;
+use arrow::{
+    array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
+    compute::concat_batches,
+    util::bit_util,
+};
+use arrow_schema::{SchemaRef, SortOptions};
+use datafusion_common::not_impl_err;
+use datafusion_common::{internal_err, JoinSide, Result};
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    SendableRecordBatchStream,
+};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::equivalence::join_equivalence_properties;
+use datafusion_physical_expr::{
+    Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, 
PhysicalExprRef,
+    PhysicalSortExpr,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use futures::TryStreamExt;
+use parking_lot::Mutex;
+use std::fmt::Formatter;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+use crate::execution_plan::{boundedness_from_children, EmissionType};
+
+use crate::joins::piecewise_merge_join::classic_join::{
+    ClassicPWMJStream, PiecewiseMergeJoinStreamState,
+};
+use crate::joins::piecewise_merge_join::utils::{
+    build_visited_indices_map, is_existence_join, is_right_existence_join,
+};
+use crate::joins::utils::asymmetric_join_output_partitioning;
+use crate::{
+    joins::{
+        utils::{build_join_schema, BuildProbeJoinMetrics, OnceAsync, OnceFut},
+        SharedBitmapBuilder,
+    },
+    metrics::ExecutionPlanMetricsSet,
+    spill::get_record_batch_memory_size,
+    ExecutionPlan, PlanProperties,
+};
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties};
+
+/// `PiecewiseMergeJoinExec` is a join execution plan that only evaluates 
single range filter and show much
+/// better performance for these workloads than `NestedLoopJoin`
+///
+/// The physical planner will choose to evaluate this join when there is only 
one comparison filter. This
+/// is a binary expression which contains [`Operator::Lt`], 
[`Operator::LtEq`], [`Operator::Gt`], and
+/// [`Operator::GtEq`].:
+/// Examples:
+///  - `col0` < `colb`, `col0` <= `colb`, `col0` > `colb`, `col0` >= `colb`
+///
+/// # Execution Plan Inputs
+/// For `PiecewiseMergeJoin` we label all right inputs as the `streamed' side 
and the left outputs as the
+/// 'buffered' side.
+///
+/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and 
is able to sort streamed record
+/// batches during processing. Sorted input must specifically be 
ascending/descending based on the operator.
+///
+/// # Algorithms
+/// Classic joins are processed differently compared to existence joins.
+///
+/// ## Classic Joins (Inner, Full, Left, Right)
+/// For classic joins we buffer the right side (the "build" side) and stream 
the left side (the "probe" side).
+/// Both sides are sorted so that we can iterate from index 0 to the end on 
each side.  This ordering ensures
+/// that when we find the first matching pair of rows, we can emit the current 
left row joined with all remaining
+/// right rows from the match position onward, without rescanning earlier 
right rows.
+///  
+/// For `<` and `<=` operators, both inputs are sorted in **descending** 
order, while for `>` and `>=` operators
+/// they are sorted in **ascending** order. This choice ensures that the 
pointer on the buffered side can advance
+/// monotonically as we stream new batches from the left side.
+///
+/// The streamed (left) side may arrive unsorted, so this operator sorts each 
incoming batch in memory before
+/// processing. The buffered (right) side is required to be globally sorted; 
the plan declares this requirement
+/// in `requires_input_order`, which allows the optimizer to automatically 
insert a `SortExec` on that side if needed.
+/// By the time this operator runs, the right side is guaranteed to be in the 
proper order.
+///
+/// The pseudocode for the algorithm looks like this:
+///
+/// ```text
+/// for stream_row in stream_batch:
+///     for buffer_row in buffer_batch:
+///         if compare(stream_row, probe_row):
+///             output stream_row X buffer_batch[buffer_row:]
+///         else:
+///             continue
+/// ```
+///
+/// The algorithm uses the streamed side (larger) to drive the loop. This is 
due to every row on the stream side iterating
+/// the buffered side to find every first match. By doing this, each match can 
output more result so that output
+/// handling can be better vectorized for performance.
+///
+/// Here is an example:
+///
+/// We perform a `JoinType::Left` with these two batches and the operator 
being `Operator::Lt`(<). For each
+/// row on the streamed side we move a pointer on the buffered until it 
matches the condition. Once we reach
+/// the row which matches (in this case with row 1 on streamed will have its 
first match on row 2 on
+/// buffered; 100 < 200 is true), we can emit all rows after that match. We 
can emit the rows like this because
+/// if the batch is sorted in ascending order, every subsequent row will also 
satisfy the condition as they will
+/// all be larger values.
+///
+/// ```text
+/// SQL statement:
+/// SELECT *
+/// FROM (VALUES (100), (200), (500)) AS streamed(a)
+/// LEFT JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
+///   ON streamed.a < buffered.b;
+///
+/// Processing Row 1:
+///
+///       Sorted Buffered Side                                         Sorted 
Streamed Side          
+///       ┌──────────────────┐                                         
┌──────────────────┐         
+///     1 │       100        │                                       1 │       
100        │        
+///       ├──────────────────┤                                         
├──────────────────┤         
+///     2 │       200        │ ─┐                                    2 │       
200        │        
+///       ├──────────────────┤  │  For row 1 on streamed side with     
├──────────────────┤         
+///     3 │       200        │  │  value 100, we emit rows 2 - 5.    3 │       
500        │       
+///       ├──────────────────┤  │  as matches when the operator is     
└──────────────────┘
+///     4 │       300        │  │  `Operator::Lt` (<) Emitting all
+///       ├──────────────────┤  │  rows after the first match (row
+///     5 │       400        │ ─┘  2 buffered side; 100 < 200)
+///       └──────────────────┘     
+///
+/// Processing Row 2:
+///   By sorting the streamed side we know
+///
+///       Sorted Buffered Side                                         Sorted 
Streamed Side          
+///       ┌──────────────────┐                                         
┌──────────────────┐         
+///     1 │       100        │                                       1 │       
100        │        
+///       ├──────────────────┤                                         
├──────────────────┤         
+///     2 │       200        │ <- Start here when probing for the    2 │       
200        │        
+///       ├──────────────────┤    streamed side row 2.                 
├──────────────────┤         
+///     3 │       200        │                                       3 │       
500        │       
+///       ├──────────────────┤                                         
└──────────────────┘
+///     4 │       300        │  
+///       ├──────────────────┤  
+///     5 │       400        │
+///       └──────────────────┘     
+///
+/// ```
+///
+/// ## Existence Joins (Semi, Anti, Mark)
+/// Existence joins are made magnitudes of times faster with a 
`PiecewiseMergeJoin` as we only need to find
+/// the min/max value of the streamed side to be able to emit all matches on 
the buffered side. By putting
+/// the side we need to mark onto the sorted buffer side, we can emit all 
these matches at once.
+///
+/// For less than operations (`<`) both inputs are to be sorted in descending 
order and vice versa for greater
+/// than (`>`) operations. `SortExec` is used to enforce sorting on the 
buffered side and streamed side does not
+/// need to be sorted due to only needing to find the min/max.
+///
+/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked 
side is on the buffered side.
+///
+/// The pseudocode for the algorithm looks like this:
+///
+/// ```text
+/// // Using the example of a less than `<` operation
+/// let max = max_batch(streamed_batch)
+///
+/// for buffer_row in buffer_batch:
+///     if buffer_row < max:
+///         output buffer_batch[buffer_row:]
+/// ```
+///
+/// Only need to find the min/max value and iterate through the buffered side 
once.
+///
+/// Here is an example:
+/// We perform a `JoinType::LeftSemi` with these two batches and the operator 
being `Operator::Lt`(<). Because
+/// the operator is `Operator::Lt` we can find the minimum value in the 
streamed side; in this case it is 200.
+/// We can then advance a pointer from the start of the buffer side until we 
find the first value that satisfies
+/// the predicate. All rows after that first matched value satisfy the 
condition 200 < x so we can mark all of
+/// those rows as matched.
+///
+/// ```text
+/// SQL statement:
+/// SELECT *
+/// FROM (VALUES (500), (200), (300)) AS streamed(a)
+/// LEFT SEMI JOIN (VALUES (100), (200), (200), (300), (400)) AS buffered(b)
+///   ON streamed.a < buffered.b;
+///
+///          Sorted Buffered Side             Unsorted Streamed Side
+///            ┌──────────────────┐          ┌──────────────────┐
+///          1 │       100        │        1 │       500        │
+///            ├──────────────────┤          ├──────────────────┤
+///          2 │       200        │        2 │       200        │
+///            ├──────────────────┤          ├──────────────────┤    
+///          3 │       200        │        3 │       300        │
+///            ├──────────────────┤          └──────────────────┘
+///          4 │       300        │ ─┐       
+///            ├──────────────────┤  | We emit matches for row 4 - 5
+///          5 │       400        │ ─┘ on the buffered side.
+///            └──────────────────┘
+///             min value: 200
+/// ```
+///
+/// For both types of joins, the buffered side must be sorted ascending for 
`Operator::Lt` (<) or
+/// `Operator::LtEq` (<=) and descending for `Operator::Gt` (>) or 
`Operator::GtEq` (>=).
+///

Review Comment:
   It would be great to add a section here explain the parallelism and 
synchronization, here are some important ideas to mention:
   - All partition is using a single shared buffered side data, and they'll be 
synchronized when loading the buffer side input.
   - Only a single partition is responsible for output the unmatched buffer 
side entries for left/full joins



##########
datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs:
##########
@@ -0,0 +1,1533 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream Implementation for PiecewiseMergeJoin's Classic Join (Left, Right, 
Full, Inner)
+
+use arrow::array::{new_null_array, Array, PrimitiveBuilder};
+use arrow::compute::{take, BatchCoalescer};
+use arrow::datatypes::UInt32Type;
+use arrow::{
+    array::{ArrayRef, RecordBatch, UInt32Array},
+    compute::{sort_to_indices, take_record_batch},
+};
+use arrow_schema::{Schema, SchemaRef, SortOptions};
+use datafusion_common::NullEquality;
+use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_expr::{JoinType, Operator};
+use datafusion_physical_expr::PhysicalExprRef;
+use futures::{Stream, StreamExt};
+use std::sync::atomic::AtomicUsize;
+use std::{cmp::Ordering, task::ready};
+use std::{sync::Arc, task::Poll};
+
+use crate::handle_state;
+use crate::joins::piecewise_merge_join::exec::{BufferedSide, 
BufferedSideReadyState};
+use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
+use crate::joins::utils::{compare_join_arrays, 
get_final_indices_from_shared_bitmap};
+use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
+
+pub(super) enum PiecewiseMergeJoinStreamState {
+    WaitBufferedSide,
+    FetchStreamBatch,
+    ProcessStreamBatch(StreamedBatch),
+    ExhaustedStreamSide,
+    Completed,
+}
+
+impl PiecewiseMergeJoinStreamState {
+    // Grab mutable reference to the current stream batch
+    fn try_as_process_stream_batch_mut(&mut self) -> Result<&mut 
StreamedBatch> {
+        match self {
+            PiecewiseMergeJoinStreamState::ProcessStreamBatch(state) => 
Ok(state),
+            _ => internal_err!("Expected streamed batch in StreamBatch"),
+        }
+    }
+}
+
+pub(super) struct StreamedBatch {
+    pub batch: RecordBatch,
+    values: Vec<ArrayRef>,
+}
+
+impl StreamedBatch {
+    #[allow(dead_code)]
+    fn new(batch: RecordBatch, values: Vec<ArrayRef>) -> Self {
+        Self { batch, values }
+    }
+
+    fn values(&self) -> &Vec<ArrayRef> {
+        &self.values
+    }
+}
+
+pub(super) struct ClassicPWMJStream {
+    // Output schema of the `PiecewiseMergeJoin`
+    pub schema: Arc<Schema>,
+
+    // Physical expression that is evaluated on the streamed side
+    // We do not need on_buffered as this is already evaluated when
+    // creating the buffered side which happens before initializing
+    // `PiecewiseMergeJoinStream`
+    pub on_streamed: PhysicalExprRef,
+    // Type of join
+    pub join_type: JoinType,
+    // Comparison operator
+    pub operator: Operator,
+    // Streamed batch
+    pub streamed: SendableRecordBatchStream,
+    // Streamed schema
+    streamed_schema: SchemaRef,
+    // Buffered side data
+    buffered_side: BufferedSide,
+    // Tracks the state of the `PiecewiseMergeJoin`
+    state: PiecewiseMergeJoinStreamState,
+    // Sort option for buffered and streamed side (specifies whether
+    // the sort is ascending or descending)
+    sort_option: SortOptions,
+    // Metrics for build + probe joins
+    join_metrics: BuildProbeJoinMetrics,
+    // Tracking incremental state for emitting record batches
+    batch_process_state: BatchProcessState,
+    // To synchronize when partition needs to finish
+    remaining_partitions: Arc<AtomicUsize>,
+}
+
+impl RecordBatchStream for ClassicPWMJStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+// `PiecewiseMergeJoinStreamState` is separated into `WaitBufferedSide`, 
`FetchStreamBatch`,
+// `ProcessStreamBatch`, `ExhaustedStreamSide` and `Completed`.
+//
+// Classic Joins
+//  1. `WaitBufferedSide` - Load in the buffered side data into memory.
+//  2. `FetchStreamBatch` -  Fetch + sort incoming stream batches. We switch 
the state to
+//     `Completed` if there are are still remaining partitions to process. It 
is only switched to
+//     `ExhaustedStreamBatch` if all partitions have been processed.
+//  3. `ProcessStreamBatch` - Compare stream batch row values against the 
buffered side data.
+//  4. `ExhaustedStreamBatch` - If the join type is Left or Inner we will 
return state as
+//      `Completed` however for Full and Right we will need to process the 
unmatched buffered rows.
+impl ClassicPWMJStream {
+    // Creates a new `PiecewiseMergeJoinStream` instance
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        schema: Arc<Schema>,
+        on_streamed: PhysicalExprRef,
+        join_type: JoinType,
+        operator: Operator,
+        streamed: SendableRecordBatchStream,
+        buffered_side: BufferedSide,
+        state: PiecewiseMergeJoinStreamState,
+        sort_option: SortOptions,
+        join_metrics: BuildProbeJoinMetrics,
+        batch_size: usize,
+        remaining_partitions: Arc<AtomicUsize>,
+    ) -> Self {
+        Self {
+            schema: Arc::clone(&schema),
+            on_streamed,
+            join_type,
+            operator,
+            streamed_schema: streamed.schema(),
+            streamed,
+            buffered_side,
+            state,
+            sort_option,
+            join_metrics,
+            batch_process_state: BatchProcessState::new(schema, batch_size),
+            remaining_partitions,
+        }
+    }
+
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return match self.state {
+                PiecewiseMergeJoinStreamState::WaitBufferedSide => {
+                    handle_state!(ready!(self.collect_buffered_side(cx)))
+                }
+                PiecewiseMergeJoinStreamState::FetchStreamBatch => {
+                    handle_state!(ready!(self.fetch_stream_batch(cx)))
+                }
+                PiecewiseMergeJoinStreamState::ProcessStreamBatch(_) => {
+                    handle_state!(self.process_stream_batch())
+                }
+                PiecewiseMergeJoinStreamState::ExhaustedStreamSide => {
+                    handle_state!(self.process_unmatched_buffered_batch())
+                }
+                PiecewiseMergeJoinStreamState::Completed => Poll::Ready(None),
+            };
+        }
+    }
+
+    // Collects buffered side data
+    fn collect_buffered_side(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        let build_timer = self.join_metrics.build_time.timer();
+        let buffered_data = ready!(self
+            .buffered_side
+            .try_as_initial_mut()?
+            .buffered_fut
+            .get_shared(cx))?;
+        build_timer.done();
+
+        // We will start fetching stream batches for classic joins
+        self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+
+        self.buffered_side =
+            BufferedSide::Ready(BufferedSideReadyState { buffered_data });
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Fetches incoming stream batches
+    fn fetch_stream_batch(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+        match ready!(self.streamed.poll_next_unpin(cx)) {
+            None => {
+                if self
+                    .remaining_partitions
+                    .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
+                    == 1
+                {
+                    self.batch_process_state.reset();
+                    self.state = 
PiecewiseMergeJoinStreamState::ExhaustedStreamSide;
+                } else {
+                    self.state = PiecewiseMergeJoinStreamState::Completed;
+                }
+            }
+            Some(Ok(batch)) => {
+                // Evaluate the streamed physical expression on the stream 
batch
+                let stream_values: ArrayRef = self
+                    .on_streamed
+                    .evaluate(&batch)?
+                    .into_array(batch.num_rows())?;
+
+                self.join_metrics.input_batches.add(1);
+                self.join_metrics.input_rows.add(batch.num_rows());
+
+                // Sort stream values and change the streamed record batch 
accordingly
+                let indices = sort_to_indices(
+                    stream_values.as_ref(),
+                    Some(self.sort_option),
+                    None,
+                )?;
+                let stream_batch = take_record_batch(&batch, &indices)?;
+                let stream_values = take(stream_values.as_ref(), &indices, 
None)?;
+
+                // Reset BatchProcessState before processing a new stream batch
+                self.batch_process_state.reset();
+                self.state =
+                    
PiecewiseMergeJoinStreamState::ProcessStreamBatch(StreamedBatch {
+                        batch: stream_batch,
+                        values: vec![stream_values],
+                    });
+            }
+            Some(Err(err)) => return Poll::Ready(Err(err)),
+        };
+
+        Poll::Ready(Ok(StatefulStreamResult::Continue))
+    }
+
+    // Only classic join will call. This function will process stream batches 
and evaluate against
+    // the buffered side data.
+    fn process_stream_batch(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        let buffered_side = self.buffered_side.try_as_ready_mut()?;
+        let stream_batch = self.state.try_as_process_stream_batch_mut()?;
+
+        if let Some(batch) = self
+            .batch_process_state
+            .output_batches
+            .next_completed_batch()
+        {
+            return Ok(StatefulStreamResult::Ready(Some(batch)));
+        }
+
+        // Produce more work
+        let batch = resolve_classic_join(
+            buffered_side,
+            stream_batch,
+            Arc::clone(&self.schema),
+            self.operator,
+            self.sort_option,
+            self.join_type,
+            &mut self.batch_process_state,
+        )?;
+
+        if !self.batch_process_state.continue_process {
+            // We finished scanning this stream batch.
+            self.batch_process_state
+                .output_batches
+                .finish_buffered_batch()?;
+            if let Some(b) = self
+                .batch_process_state
+                .output_batches
+                .next_completed_batch()
+            {
+                self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;
+                return Ok(StatefulStreamResult::Ready(Some(b)));
+            }
+            // Nothing pending; hand back whatever `resolve` returned (often 
empty) and move on.
+            self.state = PiecewiseMergeJoinStreamState::FetchStreamBatch;

Review Comment:
   Before state transition, can we add a sanity check that there is no 
remaining batch to output in the `BatchCoalescer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to