This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-2.18 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit fbd59d2f6f4ee9ad8e3c1b64ac61aecef211065e Author: Vladimir Steshin <[email protected]> AuthorDate: Fri Feb 6 09:19:13 2026 +0300 IGNITE-27660 SQL Calcite: Fix usage of checkState() in the join nodes - Fixes #12680. Signed-off-by: Aleksey Plekhanov <[email protected]> (cherry picked from commit bf8b18da2987a8bd341536354dafeb9b6f51f34c) --- .../rel/AbstractRightMaterializedJoinNode.java | 1 + .../exec/rel/CorrelatedNestedLoopJoinNode.java | 37 +++++++------ .../query/calcite/exec/rel/HashJoinNode.java | 5 +- .../query/calcite/exec/rel/MergeJoinNode.java | 61 +++++++++++++--------- .../query/calcite/exec/rel/NestedLoopJoinNode.java | 57 ++++++++++++-------- 5 files changed, 98 insertions(+), 63 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java index 7dce770bcae..27c25f24e58 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java @@ -81,6 +81,7 @@ public abstract class AbstractRightMaterializedJoinNode<Row> extends MemoryTrack waitingLeft = 0; waitingRight = 0; left = null; + processed = 0; leftInBuf.clear(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index b1d78912d49..8860cc10570 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.BiPredicate; - import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; @@ -77,6 +76,9 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { /** */ private int rightIdx; + /** */ + private int processed; + /** */ private Row rightEmptyRow; @@ -129,8 +131,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { assert !F.isEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested; - checkState(); - requested = rowsCnt; onRequest(); @@ -196,8 +196,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { assert downstream() != null; assert waitingLeft > 0; - checkState(); - waitingLeft--; if (leftInBuf == null) @@ -213,8 +211,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { assert downstream() != null; assert waitingRight > 0; - checkState(); - waitingRight--; if (rightInBuf == null) @@ -269,9 +265,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { assert F.isEmpty(rightInBuf); context().execute(() -> { - checkState(); - state = State.FILLING_LEFT; + leftSource().request(waitingLeft = leftInBufferSize); }, this::onError); @@ -282,11 +277,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { assert waitingRight == -1 || waitingRight == 0 && rightInBuf.size() == rightInBufferSize; assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize; - context().execute(() -> { - checkState(); - - join(); - }, this::onError); + context().execute(this::join0, this::onError); break; @@ -308,6 +299,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { if (leftInBuf.size() == leftInBufferSize) { assert waitingLeft == 0; + checkState(); + prepareCorrelations(); if (waitingRight == -1) @@ -384,7 +377,11 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { leftIdx = 0; while (requested > 0 && leftIdx < leftInBuf.size()) { - checkState(); + if (processed++ > IN_BUFFER_SIZE) { + context().execute(this::join0, this::onError); + + return; + } Row left = leftInBuf.get(leftIdx); Row right = rightInBuf.get(rightIdx); @@ -434,6 +431,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { try { while (requested > 0 && notMatchedIdx < leftInBuf.size()) { + processed++; requested--; downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow)); @@ -500,4 +498,13 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { context().setCorrelated(row, correlationIds.get(i).getId()); } } + + /** */ + private void join0() throws Exception { + checkState(); + + processed = 0; + + join(); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java index f845fb21d95..c3731b26037 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java @@ -198,8 +198,11 @@ public abstract class HashJoinNode<Row> extends AbstractRightMaterializedJoinNod hashStore.computeIfAbsent(key, k -> createRowList()).add(row); } - if (waitingRight == 0) + if (waitingRight == 0) { + checkState(); + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java index 1f3f97e40d7..1c4d902c596 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java @@ -77,6 +77,9 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { /** */ protected boolean inLoop; + /** */ + private int processed; + /** * Flag indicating that at least one of the inputs has exchange underneath. In this case we can't prematurely end * downstream if one of the inputs is drained, we need to wait for both inputs, since async message from remote @@ -107,24 +110,16 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { assert !F.isEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; - checkState(); - requested = rowsCnt; if (!inLoop) - context().execute(this::doJoin, this::onError); - } - - /** */ - private void doJoin() throws Exception { - checkState(); - - join(); + context().execute(this::join0, this::onError); } /** {@inheritDoc} */ @Override protected void rewindInternal() { requested = 0; + processed = 0; waitingLeft = 0; waitingRight = 0; @@ -184,8 +179,6 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { assert downstream() != null; assert waitingLeft > 0; - checkState(); - waitingLeft--; if (!finishing) @@ -199,8 +192,6 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { assert downstream() != null; assert waitingRight > 0; - checkState(); - waitingRight--; if (!finishing) @@ -214,8 +205,6 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { assert downstream() != null; assert waitingLeft > 0; - checkState(); - waitingLeft = NOT_WAITING; join(); @@ -226,8 +215,6 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { assert downstream() != null; assert waitingRight > 0; - checkState(); - waitingRight = NOT_WAITING; join(); @@ -339,7 +326,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) left = leftInBuf.remove(); @@ -469,7 +457,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null || waitingRight == NOT_WAITING)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) { left = leftInBuf.remove(); @@ -622,7 +611,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { try { while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null && !leftInBuf.isEmpty()) left = leftInBuf.remove(); @@ -796,7 +786,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { try { while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING) && !(right == null && rightInBuf.isEmpty() && rightMaterialization == null && waitingRight != NOT_WAITING)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null && !leftInBuf.isEmpty()) { left = leftInBuf.remove(); @@ -975,7 +966,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { inLoop = true; try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty())) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) left = leftInBuf.remove(); @@ -1031,7 +1023,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && !(right == null && rightInBuf.isEmpty() && waitingRight != NOT_WAITING)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) left = leftInBuf.remove(); @@ -1070,4 +1063,24 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { tryToRequestInputs(); } } + + /** */ + private void join0() throws Exception { + checkState(); + + processed = 0; + + join(); + } + + /** */ + protected boolean rescheduleJoin() { + if (processed++ > IN_BUFFER_SIZE) { + context().execute(this::join0, this::onError); + + return true; + } + + return false; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index f2f42e3b68d..51b6d2b8518 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -66,16 +66,17 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ assert downstream() != null; assert waitingRight > 0; - checkState(); - nodeMemoryTracker.onRowAdded(row); waitingRight--; rightMaterialized.add(row); - if (waitingRight == 0) + if (waitingRight == 0) { + checkState(); + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } } /** */ @@ -138,7 +139,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ left = leftInBuf.remove(); while (requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; if (!cond.test(left, rightMaterialized.get(rightIdx++))) continue; @@ -206,7 +208,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ } while (requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; if (!cond.test(left, rightMaterialized.get(rightIdx++))) continue; @@ -294,7 +297,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ left = leftInBuf.remove(); while (requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; Row right = rightMaterialized.get(rightIdx++); @@ -323,16 +327,16 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ assert lastPushedInd >= 0; inLoop = true; + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd); + try { - for (lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd);; - lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1) - ) { - checkState(); + Row emptyLeft = lastPushedInd < 0 ? null : leftRowFactory.create(); - if (lastPushedInd < 0) - break; + while (lastPushedInd >= 0) { + if (rescheduleJoin()) + return; - Row row = rowHnd.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); + Row row = rowHnd.concat(emptyLeft, rightMaterialized.get(lastPushedInd)); rightNotMatchedIndexes.clear(lastPushedInd); @@ -341,6 +345,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ if (lastPushedInd == Integer.MAX_VALUE || requested <= 0) break; + + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1); } } finally { @@ -415,7 +421,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ } while (requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; Row right = rightMaterialized.get(rightIdx++); @@ -456,16 +463,16 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ assert lastPushedInd >= 0; inLoop = true; + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd); + try { - for (lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd);; - lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1) - ) { - checkState(); + Row emptyLeft = lastPushedInd < 0 ? null : leftRowFactory.create(); - if (lastPushedInd < 0) - break; + while (lastPushedInd >= 0) { + if (rescheduleJoin()) + return; - Row row = rowHnd.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); + Row row = rowHnd.concat(emptyLeft, rightMaterialized.get(lastPushedInd)); rightNotMatchedIndexes.clear(lastPushedInd); @@ -474,6 +481,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ if (lastPushedInd == Integer.MAX_VALUE || requested <= 0) break; + + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1); } } finally { @@ -505,7 +514,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ boolean matched = false; while (!matched && requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; if (!cond.test(left, rightMaterialized.get(rightIdx++))) continue; @@ -549,7 +559,8 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractRightMaterializedJ boolean matched = false; while (!matched && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; if (cond.test(left, rightMaterialized.get(rightIdx++))) matched = true;
