This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0d111c4d94 minor: Organize fields inside SortMergeJoinStream (#15557)
0d111c4d94 is described below
commit 0d111c4d946f71969cf237d24c252a2b8d219fc8
Author: suibianwanwan <[email protected]>
AuthorDate: Fri Apr 4 20:38:32 2025 +0800
minor: Organize fields inside SortMergeJoinStream (#15557)
---
.../physical-plan/src/joins/sort_merge_join.rs | 74 +++++++++++++++-------
1 file changed, 50 insertions(+), 24 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 716cff939f..89f2e3c911 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -823,42 +823,65 @@ impl BufferedBatch {
/// Sort-Merge join stream that consumes streamed and buffered data streams
/// and produces joined output stream.
struct SortMergeJoinStream {
- /// Current state of the stream
- pub state: SortMergeJoinState,
+ // ========================================================================
+ // PROPERTIES:
+ // These fields are initialized at the start and remain constant throughout
+ // the execution.
+ // ========================================================================
/// Output schema
pub schema: SchemaRef,
- /// Sort options of join columns used to sort streamed and buffered data
stream
- pub sort_options: Vec<SortOptions>,
/// null == null?
pub null_equals_null: bool,
+ /// Sort options of join columns used to sort streamed and buffered data
stream
+ pub sort_options: Vec<SortOptions>,
+ /// optional join filter
+ pub filter: Option<JoinFilter>,
+ /// How the join is performed
+ pub join_type: JoinType,
+ /// Target output batch size
+ pub batch_size: usize,
+
+ // ========================================================================
+ // STREAMED FIELDS:
+ // These fields manage the properties and state of the streamed input.
+ // ========================================================================
/// Input schema of streamed
pub streamed_schema: SchemaRef,
- /// Input schema of buffered
- pub buffered_schema: SchemaRef,
/// Streamed data stream
pub streamed: SendableRecordBatchStream,
- /// Buffered data stream
- pub buffered: SendableRecordBatchStream,
/// Current processing record batch of streamed
pub streamed_batch: StreamedBatch,
- /// Current buffered data
- pub buffered_data: BufferedData,
/// (used in outer join) Is current streamed row joined at least once?
pub streamed_joined: bool,
- /// (used in outer join) Is current buffered batches joined at least once?
- pub buffered_joined: bool,
/// State of streamed
pub streamed_state: StreamedState,
- /// State of buffered
- pub buffered_state: BufferedState,
- /// The comparison result of current streamed row and buffered batches
- pub current_ordering: Ordering,
/// Join key columns of streamed
pub on_streamed: Vec<PhysicalExprRef>,
+
+ // ========================================================================
+ // BUFFERED FIELDS:
+ // These fields manage the properties and state of the buffered input.
+ // ========================================================================
+ /// Input schema of buffered
+ pub buffered_schema: SchemaRef,
+ /// Buffered data stream
+ pub buffered: SendableRecordBatchStream,
+ /// Current buffered data
+ pub buffered_data: BufferedData,
+ /// (used in outer join) Is current buffered batches joined at least once?
+ pub buffered_joined: bool,
+ /// State of buffered
+ pub buffered_state: BufferedState,
/// Join key columns of buffered
pub on_buffered: Vec<PhysicalExprRef>,
- /// optional join filter
- pub filter: Option<JoinFilter>,
+
+ // ========================================================================
+ // MERGE JOIN STATES:
+ // These fields track the execution state of merge join and are updated
+ // during the execution.
+ // ========================================================================
+ /// Current state of the stream
+ pub state: SortMergeJoinState,
/// Staging output array builders
pub staging_output_record_batches: JoinedRecordBatches,
/// Output buffer. Currently used by filtering as it requires double
buffering
@@ -868,18 +891,21 @@ struct SortMergeJoinStream {
/// Increased when we put rows into buffer and decreased after we actually
output batches.
/// Used to trigger output when sufficient rows are ready
pub output_size: usize,
- /// Target output batch size
- pub batch_size: usize,
- /// How the join is performed
- pub join_type: JoinType,
+ /// The comparison result of current streamed row and buffered batches
+ pub current_ordering: Ordering,
+ /// Manages the process of spilling and reading back intermediate data
+ pub spill_manager: SpillManager,
+
+ // ========================================================================
+ // EXECUTION RESOURCES:
+ // Fields related to managing execution resources and monitoring
performance.
+ // ========================================================================
/// Metrics
pub join_metrics: SortMergeJoinMetrics,
/// Memory reservation
pub reservation: MemoryReservation,
/// Runtime env
pub runtime_env: Arc<RuntimeEnv>,
- /// Manages the process of spilling and reading back intermediate data
- pub spill_manager: SpillManager,
/// A unique number for each batch
pub streamed_batch_counter: AtomicUsize,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]