This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new bf8b18da298 IGNITE-27660 SQL Calcite: Fix usage of checkState() in the 
join nodes - Fixes #12680.
bf8b18da298 is described below

commit bf8b18da2987a8bd341536354dafeb9b6f51f34c
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Feb 6 09:19:13 2026 +0300

    IGNITE-27660 SQL Calcite: Fix usage of checkState() in the join nodes - 
Fixes #12680.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../rel/AbstractRightMaterializedJoinNode.java     |  1 +
 .../exec/rel/CorrelatedNestedLoopJoinNode.java     | 37 +++++++------
 .../query/calcite/exec/rel/HashJoinNode.java       |  5 +-
 .../query/calcite/exec/rel/MergeJoinNode.java      | 61 +++++++++++++---------
 .../query/calcite/exec/rel/NestedLoopJoinNode.java | 57 ++++++++++++--------
 5 files changed, 98 insertions(+), 63 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
index 7dce770bcae..27c25f24e58 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
@@ -81,6 +81,7 @@ public abstract class AbstractRightMaterializedJoinNode<Row> 
extends MemoryTrack
         waitingLeft = 0;
         waitingRight = 0;
         left = null;
+        processed = 0;
 
         leftInBuf.clear();
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
index b1d78912d49..8860cc10570 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.function.BiPredicate;
-
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
@@ -77,6 +76,9 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
     /** */
     private int rightIdx;
 
+    /** */
+    private int processed;
+
     /** */
     private Row rightEmptyRow;
 
@@ -129,8 +131,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         assert !F.isEmpty(sources()) && sources().size() == 2;
         assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", 
requested=" + requested;
 
-        checkState();
-
         requested = rowsCnt;
 
         onRequest();
@@ -196,8 +196,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         assert downstream() != null;
         assert waitingLeft > 0;
 
-        checkState();
-
         waitingLeft--;
 
         if (leftInBuf == null)
@@ -213,8 +211,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         assert downstream() != null;
         assert waitingRight > 0;
 
-        checkState();
-
         waitingRight--;
 
         if (rightInBuf == null)
@@ -269,9 +265,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
                 assert F.isEmpty(rightInBuf);
 
                 context().execute(() -> {
-                    checkState();
-
                     state = State.FILLING_LEFT;
+
                     leftSource().request(waitingLeft = leftInBufferSize);
                 }, this::onError);
 
@@ -282,11 +277,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
                 assert waitingRight == -1 || waitingRight == 0 && 
rightInBuf.size() == rightInBufferSize;
                 assert waitingLeft == -1 || waitingLeft == 0 && 
leftInBuf.size() == leftInBufferSize;
 
-                context().execute(() -> {
-                    checkState();
-
-                    join();
-                }, this::onError);
+                context().execute(this::join0, this::onError);
 
                 break;
 
@@ -308,6 +299,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         if (leftInBuf.size() == leftInBufferSize) {
             assert waitingLeft == 0;
 
+            checkState();
+
             prepareCorrelations();
 
             if (waitingRight == -1)
@@ -384,7 +377,11 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
                     leftIdx = 0;
 
                 while (requested > 0 && leftIdx < leftInBuf.size()) {
-                    checkState();
+                    if (processed++ > IN_BUFFER_SIZE) {
+                        context().execute(this::join0, this::onError);
+
+                        return;
+                    }
 
                     Row left = leftInBuf.get(leftIdx);
                     Row right = rightInBuf.get(rightIdx);
@@ -434,6 +431,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
 
                 try {
                     while (requested > 0 && notMatchedIdx < leftInBuf.size()) {
+                        processed++;
                         requested--;
 
                         
downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow));
@@ -500,4 +498,13 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
             context().setCorrelated(row, correlationIds.get(i).getId());
         }
     }
+
+    /** */
+    private void join0() throws Exception {
+        checkState();
+
+        processed = 0;
+
+        join();
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
index f845fb21d95..c3731b26037 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
@@ -198,8 +198,11 @@ public abstract class HashJoinNode<Row> extends 
AbstractRightMaterializedJoinNod
                 hashStore.computeIfAbsent(key, k -> createRowList()).add(row);
             }
 
-            if (waitingRight == 0)
+            if (waitingRight == 0) {
+                checkState();
+
                 rightSource().request(waitingRight = IN_BUFFER_SIZE);
+            }
         }
 
         /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index 1f3f97e40d7..1c4d902c596 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -77,6 +77,9 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
     /** */
     protected boolean inLoop;
 
+    /** */
+    private int processed;
+
     /**
      * Flag indicating that at least one of the inputs has exchange 
underneath. In this case we can't prematurely end
      * downstream if one of the inputs is drained, we need to wait for both 
inputs, since async message from remote
@@ -107,24 +110,16 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
         assert !F.isEmpty(sources()) && sources().size() == 2;
         assert rowsCnt > 0 && requested == 0;
 
-        checkState();
-
         requested = rowsCnt;
 
         if (!inLoop)
-            context().execute(this::doJoin, this::onError);
-    }
-
-    /** */
-    private void doJoin() throws Exception {
-        checkState();
-
-        join();
+            context().execute(this::join0, this::onError);
     }
 
     /** {@inheritDoc} */
     @Override protected void rewindInternal() {
         requested = 0;
+        processed = 0;
         waitingLeft = 0;
         waitingRight = 0;
 
@@ -184,8 +179,6 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
         assert downstream() != null;
         assert waitingLeft > 0;
 
-        checkState();
-
         waitingLeft--;
 
         if (!finishing)
@@ -199,8 +192,6 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
         assert downstream() != null;
         assert waitingRight > 0;
 
-        checkState();
-
         waitingRight--;
 
         if (!finishing)
@@ -214,8 +205,6 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
         assert downstream() != null;
         assert waitingLeft > 0;
 
-        checkState();
-
         waitingLeft = NOT_WAITING;
 
         join();
@@ -226,8 +215,6 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
         assert downstream() != null;
         assert waitingRight > 0;
 
-        checkState();
-
         waitingRight = NOT_WAITING;
 
         join();
@@ -339,7 +326,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             try {
                 while (requested > 0 && (left != null || !leftInBuf.isEmpty()) 
&& (right != null || !rightInBuf.isEmpty()
                     || rightMaterialization != null)) {
-                    checkState();
+                    if (rescheduleJoin())
+                        return;
 
                     if (left == null)
                         left = leftInBuf.remove();
@@ -469,7 +457,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             try {
                 while (requested > 0 && (left != null || !leftInBuf.isEmpty()) 
&& (right != null || !rightInBuf.isEmpty()
                     || rightMaterialization != null || waitingRight == 
NOT_WAITING)) {
-                    checkState();
+                    if (rescheduleJoin())
+                        return;
 
                     if (left == null) {
                         left = leftInBuf.remove();
@@ -622,7 +611,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             try {
                 while (requested > 0 && !(left == null && leftInBuf.isEmpty() 
&& waitingLeft != NOT_WAITING)
                     && (right != null || !rightInBuf.isEmpty() || 
rightMaterialization != null)) {
-                    checkState();
+                    if (rescheduleJoin())
+                        return;
 
                     if (left == null && !leftInBuf.isEmpty())
                         left = leftInBuf.remove();
@@ -796,7 +786,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             try {
                 while (requested > 0 && !(left == null && leftInBuf.isEmpty() 
&& waitingLeft != NOT_WAITING)
                     && !(right == null && rightInBuf.isEmpty() && 
rightMaterialization == null && waitingRight != NOT_WAITING)) {
-                    checkState();
+                    if (rescheduleJoin())
+                        return;
 
                     if (left == null && !leftInBuf.isEmpty()) {
                         left = leftInBuf.remove();
@@ -975,7 +966,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             inLoop = true;
             try {
                 while (requested > 0 && (left != null || !leftInBuf.isEmpty()) 
&& (right != null || !rightInBuf.isEmpty())) {
-                    checkState();
+                    if (rescheduleJoin())
+                        return;
 
                     if (left == null)
                         left = leftInBuf.remove();
@@ -1031,7 +1023,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             try {
                 while (requested > 0 && (left != null || !leftInBuf.isEmpty()) 
&&
                     !(right == null && rightInBuf.isEmpty() && waitingRight != 
NOT_WAITING)) {
-                    checkState();
+                    if (rescheduleJoin())
+                        return;
 
                     if (left == null)
                         left = leftInBuf.remove();
@@ -1070,4 +1063,24 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             tryToRequestInputs();
         }
     }
+
+    /** */
+    private void join0() throws Exception {
+        checkState();
+
+        processed = 0;
+
+        join();
+    }
+
+    /** */
+    protected boolean rescheduleJoin() {
+        if (processed++ > IN_BUFFER_SIZE) {
+            context().execute(this::join0, this::onError);
+
+            return true;
+        }
+
+        return false;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index f2f42e3b68d..51b6d2b8518 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -66,16 +66,17 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
         assert downstream() != null;
         assert waitingRight > 0;
 
-        checkState();
-
         nodeMemoryTracker.onRowAdded(row);
 
         waitingRight--;
 
         rightMaterialized.add(row);
 
-        if (waitingRight == 0)
+        if (waitingRight == 0) {
+            checkState();
+
             rightSource().request(waitingRight = IN_BUFFER_SIZE);
+        }
     }
 
     /** */
@@ -138,7 +139,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                             left = leftInBuf.remove();
 
                         while (requested > 0 && rightIdx < 
rightMaterialized.size()) {
-                            checkState();
+                            if (rescheduleJoin())
+                                return;
 
                             if (!cond.test(left, 
rightMaterialized.get(rightIdx++)))
                                 continue;
@@ -206,7 +208,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                         }
 
                         while (requested > 0 && rightIdx < 
rightMaterialized.size()) {
-                            checkState();
+                            if (rescheduleJoin())
+                                return;
 
                             if (!cond.test(left, 
rightMaterialized.get(rightIdx++)))
                                 continue;
@@ -294,7 +297,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                             left = leftInBuf.remove();
 
                         while (requested > 0 && rightIdx < 
rightMaterialized.size()) {
-                            checkState();
+                            if (rescheduleJoin())
+                                return;
 
                             Row right = rightMaterialized.get(rightIdx++);
 
@@ -323,16 +327,16 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                 assert lastPushedInd >= 0;
 
                 inLoop = true;
+                lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd);
+
                 try {
-                    for (lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd);;
-                        lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1)
-                    ) {
-                        checkState();
+                    Row emptyLeft = lastPushedInd < 0 ? null : 
leftRowFactory.create();
 
-                        if (lastPushedInd < 0)
-                            break;
+                    while (lastPushedInd >= 0) {
+                        if (rescheduleJoin())
+                            return;
 
-                        Row row = rowHnd.concat(leftRowFactory.create(), 
rightMaterialized.get(lastPushedInd));
+                        Row row = rowHnd.concat(emptyLeft, 
rightMaterialized.get(lastPushedInd));
 
                         rightNotMatchedIndexes.clear(lastPushedInd);
 
@@ -341,6 +345,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
 
                         if (lastPushedInd == Integer.MAX_VALUE || requested <= 
0)
                             break;
+
+                        lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1);
                     }
                 }
                 finally {
@@ -415,7 +421,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                         }
 
                         while (requested > 0 && rightIdx < 
rightMaterialized.size()) {
-                            checkState();
+                            if (rescheduleJoin())
+                                return;
 
                             Row right = rightMaterialized.get(rightIdx++);
 
@@ -456,16 +463,16 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                 assert lastPushedInd >= 0;
 
                 inLoop = true;
+                lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd);
+
                 try {
-                    for (lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd);;
-                        lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1)
-                    ) {
-                        checkState();
+                    Row emptyLeft = lastPushedInd < 0 ? null : 
leftRowFactory.create();
 
-                        if (lastPushedInd < 0)
-                            break;
+                    while (lastPushedInd >= 0) {
+                        if (rescheduleJoin())
+                            return;
 
-                        Row row = rowHnd.concat(leftRowFactory.create(), 
rightMaterialized.get(lastPushedInd));
+                        Row row = rowHnd.concat(emptyLeft, 
rightMaterialized.get(lastPushedInd));
 
                         rightNotMatchedIndexes.clear(lastPushedInd);
 
@@ -474,6 +481,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
 
                         if (lastPushedInd == Integer.MAX_VALUE || requested <= 
0)
                             break;
+
+                        lastPushedInd = 
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1);
                     }
                 }
                 finally {
@@ -505,7 +514,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                     boolean matched = false;
 
                     while (!matched && requested > 0 && rightIdx < 
rightMaterialized.size()) {
-                        checkState();
+                        if (rescheduleJoin())
+                            return;
 
                         if (!cond.test(left, 
rightMaterialized.get(rightIdx++)))
                             continue;
@@ -549,7 +559,8 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractRightMaterializedJ
                         boolean matched = false;
 
                         while (!matched && rightIdx < 
rightMaterialized.size()) {
-                            checkState();
+                            if (rescheduleJoin())
+                                return;
 
                             if (cond.test(left, 
rightMaterialized.get(rightIdx++)))
                                 matched = true;

Reply via email to