This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d111c4d94 minor: Organize fields inside SortMergeJoinStream (#15557)
0d111c4d94 is described below

commit 0d111c4d946f71969cf237d24c252a2b8d219fc8
Author: suibianwanwan <[email protected]>
AuthorDate: Fri Apr 4 20:38:32 2025 +0800

    minor: Organize fields inside SortMergeJoinStream (#15557)
---
 .../physical-plan/src/joins/sort_merge_join.rs     | 74 +++++++++++++++-------
 1 file changed, 50 insertions(+), 24 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 716cff939f..89f2e3c911 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -823,42 +823,65 @@ impl BufferedBatch {
 /// Sort-Merge join stream that consumes streamed and buffered data streams
 /// and produces joined output stream.
 struct SortMergeJoinStream {
-    /// Current state of the stream
-    pub state: SortMergeJoinState,
+    // ========================================================================
+    // PROPERTIES:
+    // These fields are initialized at the start and remain constant throughout
+    // the execution.
+    // ========================================================================
     /// Output schema
     pub schema: SchemaRef,
-    /// Sort options of join columns used to sort streamed and buffered data 
stream
-    pub sort_options: Vec<SortOptions>,
     /// null == null?
     pub null_equals_null: bool,
+    /// Sort options of join columns used to sort streamed and buffered data 
stream
+    pub sort_options: Vec<SortOptions>,
+    /// optional join filter
+    pub filter: Option<JoinFilter>,
+    /// How the join is performed
+    pub join_type: JoinType,
+    /// Target output batch size
+    pub batch_size: usize,
+
+    // ========================================================================
+    // STREAMED FIELDS:
+    // These fields manage the properties and state of the streamed input.
+    // ========================================================================
     /// Input schema of streamed
     pub streamed_schema: SchemaRef,
-    /// Input schema of buffered
-    pub buffered_schema: SchemaRef,
     /// Streamed data stream
     pub streamed: SendableRecordBatchStream,
-    /// Buffered data stream
-    pub buffered: SendableRecordBatchStream,
     /// Current processing record batch of streamed
     pub streamed_batch: StreamedBatch,
-    /// Current buffered data
-    pub buffered_data: BufferedData,
     /// (used in outer join) Is current streamed row joined at least once?
     pub streamed_joined: bool,
-    /// (used in outer join) Is current buffered batches joined at least once?
-    pub buffered_joined: bool,
     /// State of streamed
     pub streamed_state: StreamedState,
-    /// State of buffered
-    pub buffered_state: BufferedState,
-    /// The comparison result of current streamed row and buffered batches
-    pub current_ordering: Ordering,
     /// Join key columns of streamed
     pub on_streamed: Vec<PhysicalExprRef>,
+
+    // ========================================================================
+    // BUFFERED FIELDS:
+    // These fields manage the properties and state of the buffered input.
+    // ========================================================================
+    /// Input schema of buffered
+    pub buffered_schema: SchemaRef,
+    /// Buffered data stream
+    pub buffered: SendableRecordBatchStream,
+    /// Current buffered data
+    pub buffered_data: BufferedData,
+    /// (used in outer join) Is current buffered batches joined at least once?
+    pub buffered_joined: bool,
+    /// State of buffered
+    pub buffered_state: BufferedState,
     /// Join key columns of buffered
     pub on_buffered: Vec<PhysicalExprRef>,
-    /// optional join filter
-    pub filter: Option<JoinFilter>,
+
+    // ========================================================================
+    // MERGE JOIN STATES:
+    // These fields track the execution state of merge join and are updated
+    // during the execution.
+    // ========================================================================
+    /// Current state of the stream
+    pub state: SortMergeJoinState,
     /// Staging output array builders
     pub staging_output_record_batches: JoinedRecordBatches,
     /// Output buffer. Currently used by filtering as it requires double 
buffering
@@ -868,18 +891,21 @@ struct SortMergeJoinStream {
     /// Increased when we put rows into buffer and decreased after we actually 
output batches.
     /// Used to trigger output when sufficient rows are ready
     pub output_size: usize,
-    /// Target output batch size
-    pub batch_size: usize,
-    /// How the join is performed
-    pub join_type: JoinType,
+    /// The comparison result of current streamed row and buffered batches
+    pub current_ordering: Ordering,
+    /// Manages the process of spilling and reading back intermediate data
+    pub spill_manager: SpillManager,
+
+    // ========================================================================
+    // EXECUTION RESOURCES:
+    // Fields related to managing execution resources and monitoring 
performance.
+    // ========================================================================
     /// Metrics
     pub join_metrics: SortMergeJoinMetrics,
     /// Memory reservation
     pub reservation: MemoryReservation,
     /// Runtime env
     pub runtime_env: Arc<RuntimeEnv>,
-    /// Manages the process of spilling and reading back intermediate data
-    pub spill_manager: SpillManager,
     /// A unique number for each batch
     pub streamed_batch_counter: AtomicUsize,
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to