metesynnada commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1384453471


##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
     Ok(())
 }
 
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {
+    // total rows in current probe batch
+    probe_rows: usize,
+    // saved probe-build indices to resume matching from
+    last_matched_indices: Option<(usize, usize)>,
+    // current iteration has been updated
+    matched_indices_updated: bool,
+    // tracking last joined probe side index seen for further indices 
adjustment
+    last_joined_probe_index: Option<usize>,
+    // tracking last joined probe side index seen for further indices 
adjustment
+    prev_joined_probe_index: Option<usize>,
+}
+
+impl HashJoinStreamState {

Review Comment:
   I strongly advise to make this enum, this is a common approach in other 
executors as well.



##########
datafusion/physical-plan/src/joins/symmetric_hash_join.rs:
##########
@@ -587,6 +590,8 @@ struct SymmetricHashJoinStream {
     reservation: SharedMemoryReservation,
     /// Flag indicating whether there is nothing to process anymore
     final_result: bool,
+    /// Stream state for compatibility with HashJoinExec
+    state: HashJoinStreamState,

Review Comment:
   I did not understand how this is used in SymmetricHashJoin. Can you explain 
it a bit further?



##########
datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt:
##########
@@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2
  ON t1.d = t2.d ORDER BY a2, t2.b
  LIMIT 5
 ----
-0 0 0 0
-0 0 2 0
-0 0 3 0
-0 0 6 0
-0 0 20 0
+1 3 95 0

Review Comment:
   I believe preserving the order of the build side is important for further 
aggregations. We can avoid unnecessary sorting by leveraging the build-side 
lexical order. This pattern is common in Datafusion and warrants investing in 
order preservation.



##########
datafusion/physical-plan/src/joins/symmetric_hash_join.rs:
##########
@@ -587,6 +590,8 @@ struct SymmetricHashJoinStream {
     reservation: SharedMemoryReservation,
     /// Flag indicating whether there is nothing to process anymore
     final_result: bool,
+    /// Stream state for compatibility with HashJoinExec
+    state: HashJoinStreamState,

Review Comment:
   I did not understand how this is used in SymmetricHashJoin. Can you explain 
it a bit further?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to