This is an automated email from the ASF dual-hosted git repository.

gengliangwang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new cf3f659a695a [SPARK-57025][SQL] SortMergeJoinExec: extract 
JoinHelper.resetMatched for full-outer BitSet bookkeeping
cf3f659a695a is described below

commit cf3f659a695a3bc2e83922a39aad61c4fde65673
Author: Gengliang Wang <[email protected]>
AuthorDate: Sat May 23 21:01:16 2026 -0700

    [SPARK-57025][SQL] SortMergeJoinExec: extract JoinHelper.resetMatched for 
full-outer BitSet bookkeeping
    
    ### What changes were proposed in this pull request?
    
    This is a sub-task of 
[SPARK-56908](https://issues.apache.org/jira/browse/SPARK-56908).
    
    `SortMergeJoinExec.codegenFullOuter` and its interpreted counterpart 
`SortMergeFullOuterJoinScanner.findMatchingRows` both duplicate the "reuse the 
`BitSet` if its capacity is enough, otherwise allocate a new one" idiom for 
tracking matched left/right rows.
    
    Extract it into a new static helper class at 
`sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java`:
    
    ```java
    public static BitSet resetMatched(BitSet matched, int bufferSize) { ... }
    ```
    
    and call it from the four sites (left + right in each method).
    
    ### Why are the changes needed?
    
    - Replaces ~16 inline lines with 4 helper calls, shrinking generated Java 
for the codegen sites.
    - Keeps the codegen and interpreted full-outer paths in lockstep so a 
future change to the reset rule lands in one place.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing `OuterJoinSuite` covers full-outer SMJ through both code paths 
(codegen on and off).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code
    
    Closes #56073 from gengliangwang/SPARK-57025-resetMatched.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit b2c2a8d68dcbbaca715adc74c0dd543582c9ff02)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/sql/execution/joins/JoinHelper.java      | 47 ++++++++++++++++++++++
 .../sql/execution/joins/SortMergeJoinExec.scala    | 26 +++---------
 2 files changed, 53 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java 
b/sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java
new file mode 100644
index 000000000000..91156b2600fd
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins;
+
+import org.apache.spark.util.collection.BitSet;
+
+/**
+ * Static helpers shared by join operators in this package, used both from 
whole-stage codegen and
+ * from interpreted execution paths. Hoisting recurring snippets here keeps 
the generated Java
+ * source smaller and lets the JIT compile the bodies once instead of once per 
stage.
+ */
+public final class JoinHelper {
+
+  private JoinHelper() {}
+
+  /**
+   * Reset a Spark {@link org.apache.spark.util.collection.BitSet} (not {@link 
java.util.BitSet})
+   * that tracks which rows in a buffer of size {@code bufferSize} have 
already been matched.
+   * Reuses {@code matched} when its capacity is sufficient; otherwise returns 
a freshly allocated
+   * BitSet. Callers must assign the returned reference back to their bit-set 
field.
+   *
+   * <p>Used by full-outer sort-merge join, where the left- and right-side 
buffers are repopulated
+   * for each batch of rows sharing a join key.
+   */
+  public static BitSet resetMatched(BitSet matched, int bufferSize) {
+    if (bufferSize <= matched.capacity()) {
+      matched.clearUntil(bufferSize);
+      return matched;
+    }
+    return new BitSet(bufferSize);
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index bc2f9197df9d..985fc518742c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -954,16 +954,10 @@ case class SortMergeJoinExec(
          |  }
          |
          |  // Reset bit sets of buffers accordingly
-         |  if ($leftBuffer.size() <= $leftMatched.capacity()) {
-         |    $leftMatched.clearUntil($leftBuffer.size());
-         |  } else {
-         |    $leftMatched = new $matchedClsName($leftBuffer.size());
-         |  }
-         |  if ($rightBuffer.size() <= $rightMatched.capacity()) {
-         |    $rightMatched.clearUntil($rightBuffer.size());
-         |  } else {
-         |    $rightMatched = new $matchedClsName($rightBuffer.size());
-         |  }
+         |  $leftMatched = ${classOf[JoinHelper].getName}.resetMatched(
+         |    $leftMatched, $leftBuffer.size());
+         |  $rightMatched = ${classOf[JoinHelper].getName}.resetMatched(
+         |    $rightMatched, $rightBuffer.size());
          |}
        """.stripMargin)
 
@@ -1457,16 +1451,8 @@ private class SortMergeFullOuterJoinScanner(
       advancedRight()
     }
 
-    if (leftMatches.size <= leftMatched.capacity) {
-      leftMatched.clearUntil(leftMatches.size)
-    } else {
-      leftMatched = new BitSet(leftMatches.size)
-    }
-    if (rightMatches.size <= rightMatched.capacity) {
-      rightMatched.clearUntil(rightMatches.size)
-    } else {
-      rightMatched = new BitSet(rightMatches.size)
-    }
+    leftMatched = JoinHelper.resetMatched(leftMatched, leftMatches.size)
+    rightMatched = JoinHelper.resetMatched(rightMatched, rightMatches.size)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to