This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-24675
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 601bdddc349c8472bf15b764d47163642cd38ad9
Author: amashenkov <[email protected]>
AuthorDate: Fri Feb 28 16:50:50 2025 +0300

    Fix HashJoin node hanging for RIGHT and FULL OUTER joins
---
 .../internal/sql/engine/exec/rel/HashJoinNode.java | 47 +++++++++++++++++++---
 .../sql/engine/exec/rel/HashJoinExecutionTest.java |  2 -
 2 files changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
index bf8c88b1042..0e26a93e55c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java
@@ -81,8 +81,8 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
         assert leftJoinPositions.length == rightJoinPositions.length;
 
         this.outputRowFactory = outputRowFactory;
-        this.nonEquiCondition = nonEquiCondition != null 
-                ? nonEquiCondition 
+        this.nonEquiCondition = nonEquiCondition != null
+                ? nonEquiCondition
                 : cast(ALWAYS_TRUE);
     }
 
@@ -161,6 +161,7 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                 inLoop = true;
                 try {
                     while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
+                        // Proceed with next left row, if previous was fully 
processed.
                         if (!rightIt.hasNext()) {
                             left = leftInBuf.remove();
 
@@ -170,6 +171,7 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                         }
 
                         if (rightIt.hasNext()) {
+                            // Emits matched rows.
                             while (rightIt.hasNext()) {
                                 checkState();
 
@@ -238,11 +240,13 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                     while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
                         checkState();
 
+                        // Proceed with next left row, if previous was fully 
processed.
                         if (!rightIt.hasNext()) {
                             left = leftInBuf.remove();
 
                             Collection<RowT> rightRows = lookup(left);
 
+                            // Emit unmatched left row.
                             if (rightRows.isEmpty()) {
                                 requested--;
                                 
downstream().push(outputRowFactory.concat(left, rightRowFactory.create()));
@@ -252,6 +256,7 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                         }
 
                         if (rightIt.hasNext()) {
+                            // Emits matched rows.
                             while (rightIt.hasNext()) {
                                 checkState();
 
@@ -285,6 +290,8 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
         /** Left row factory. */
         private final RowHandler.RowFactory<RowT> leftRowFactory;
 
+        private boolean drainMaterialization;
+
         /**
          * Creates HashJoinNode for RIGHT OUTER JOIN operator.
          *
@@ -307,6 +314,14 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
             this.leftRowFactory = leftRowFactory;
         }
 
+        /** {@inheritDoc} */
+        @Override
+        protected void rewindInternal() {
+            drainMaterialization = false;
+
+            super.rewindInternal();
+        }
+
         @Override
         protected void join() throws Exception {
             if (waitingRight == NOT_WAITING) {
@@ -315,6 +330,7 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                     while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
                         checkState();
 
+                        // Proceed with next left row, if previous was fully 
processed.
                         if (!rightIt.hasNext()) {
                             left = leftInBuf.remove();
 
@@ -324,6 +340,7 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                         }
 
                         if (rightIt.hasNext()) {
+                            // Emits matched rows.
                             while (rightIt.hasNext()) {
                                 checkState();
 
@@ -349,10 +366,13 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                 }
             }
 
+            // Emit unmatched right rows.
             if (left == null && leftInBuf.isEmpty() && waitingLeft == 
NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) {
                 inLoop = true;
                 try {
-                    if (!rightIt.hasNext()) {
+                    if (!rightIt.hasNext() && !drainMaterialization) {
+                        // Prevent scanning store more than once.
+                        drainMaterialization = true;
                         rightIt = getUntouched(hashStore);
                     }
 
@@ -391,6 +411,8 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
         /** Right row factory. */
         private final RowHandler.RowFactory<RowT> rightRowFactory;
 
+        private boolean drainMaterialization;
+
         /**
          * Creates HashJoinNode for FULL OUTER JOIN operator.
          *
@@ -416,6 +438,14 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
             this.rightRowFactory = rightRowFactory;
         }
 
+        /** {@inheritDoc} */
+        @Override
+        protected void rewindInternal() {
+            drainMaterialization = false;
+
+            super.rewindInternal();
+        }
+
         /** {@inheritDoc} */
         @Override
         protected void join() throws Exception {
@@ -425,11 +455,13 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                     while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
                         checkState();
 
+                        // Proceed with next left row, if previous was fully 
processed.
                         if (!rightIt.hasNext()) {
                             left = leftInBuf.remove();
 
                             Collection<RowT> rightRows = lookup(left);
 
+                            // Emit unmatched left row.
                             if (rightRows.isEmpty()) {
                                 requested--;
                                 
downstream().push(outputRowFactory.concat(left, rightRowFactory.create()));
@@ -439,6 +471,7 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                         }
 
                         if (rightIt.hasNext()) {
+                            // Emits matched rows.
                             while (rightIt.hasNext()) {
                                 checkState();
 
@@ -464,11 +497,13 @@ public abstract class HashJoinNode<RowT> extends 
AbstractRightMaterializedJoinNo
                 }
             }
 
-            if (left == null && !rightIt.hasNext() && leftInBuf.isEmpty() && 
waitingLeft == NOT_WAITING
-                    && waitingRight == NOT_WAITING && requested > 0) {
+            // Emit unmatched right rows.
+            if (left == null && leftInBuf.isEmpty() && waitingLeft == 
NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) {
                 inLoop = true;
                 try {
-                    if (!rightIt.hasNext()) {
+                    if (!rightIt.hasNext() && !drainMaterialization) {
+                        // Prevent scanning store more than once.
+                        drainMaterialization = true;
                         rightIt = getUntouched(hashStore);
                     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
index 3c031d39796..6ea0c535c74 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
@@ -186,8 +186,6 @@ public class HashJoinExecutionTest extends 
AbstractJoinExecutionTest {
     @ParameterizedTest
     @EnumSource(JoinRelType.class)
     void checkHashJoinNodeWithDifferentBufferSize(JoinRelType joinType) {
-        Assumptions.assumeFalse(joinType == RIGHT, "RIGHT join type is buggy");
-        Assumptions.assumeFalse(joinType == FULL, "FULL join type is buggy");
         Assumptions.assumeFalse(joinType == ANTI, "ANTI join type is buggy");
 
         validateDistinctData(executionContext(1), joinType, 0, 0);

Reply via email to