metesynnada commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1384453471
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
Ok(())
}
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {
+ // total rows in current probe batch
+ probe_rows: usize,
+ // saved probe-build indices to resume matching from
+ last_matched_indices: Option<(usize, usize)>,
+ // current iteration has been updated
+ matched_indices_updated: bool,
+ // tracking last joined probe side index seen for further indices
adjustment
+ last_joined_probe_index: Option<usize>,
+ // tracking last joined probe side index seen for further indices
adjustment
+ prev_joined_probe_index: Option<usize>,
+}
+
+impl HashJoinStreamState {
Review Comment:
I strongly advise to make this enum, this is a common approach in other
executors as well.
##########
datafusion/physical-plan/src/joins/symmetric_hash_join.rs:
##########
@@ -587,6 +590,8 @@ struct SymmetricHashJoinStream {
reservation: SharedMemoryReservation,
/// Flag indicating whether there is nothing to process anymore
final_result: bool,
+ /// Stream state for compatibility with HashJoinExec
+ state: HashJoinStreamState,
Review Comment:
I did not understand how this is used in SymmetricHashJoin. Can you explain
it a bit further?
##########
datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt:
##########
@@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2
ON t1.d = t2.d ORDER BY a2, t2.b
LIMIT 5
----
-0 0 0 0
-0 0 2 0
-0 0 3 0
-0 0 6 0
-0 0 20 0
+1 3 95 0
Review Comment:
I believe preserving the order of the build side is important for further
aggregations. We can avoid unnecessary sorting by leveraging the build-side
lexical order. This pattern is common in Datafusion and warrants investing in
order preservation.
##########
datafusion/physical-plan/src/joins/symmetric_hash_join.rs:
##########
@@ -587,6 +590,8 @@ struct SymmetricHashJoinStream {
reservation: SharedMemoryReservation,
/// Flag indicating whether there is nothing to process anymore
final_result: bool,
+ /// Stream state for compatibility with HashJoinExec
+ state: HashJoinStreamState,
Review Comment:
I did not understand how this is used in SymmetricHashJoin. Can you explain
it a bit further?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]