This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bf8b18da298 IGNITE-27660 SQL Calcite: Fix usage of checkState() in the
join nodes - Fixes #12680.
bf8b18da298 is described below
commit bf8b18da2987a8bd341536354dafeb9b6f51f34c
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Feb 6 09:19:13 2026 +0300
IGNITE-27660 SQL Calcite: Fix usage of checkState() in the join nodes -
Fixes #12680.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../rel/AbstractRightMaterializedJoinNode.java | 1 +
.../exec/rel/CorrelatedNestedLoopJoinNode.java | 37 +++++++------
.../query/calcite/exec/rel/HashJoinNode.java | 5 +-
.../query/calcite/exec/rel/MergeJoinNode.java | 61 +++++++++++++---------
.../query/calcite/exec/rel/NestedLoopJoinNode.java | 57 ++++++++++++--------
5 files changed, 98 insertions(+), 63 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
index 7dce770bcae..27c25f24e58 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java
@@ -81,6 +81,7 @@ public abstract class AbstractRightMaterializedJoinNode<Row>
extends MemoryTrack
waitingLeft = 0;
waitingRight = 0;
left = null;
+ processed = 0;
leftInBuf.clear();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
index b1d78912d49..8860cc10570 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.BiPredicate;
-
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
@@ -77,6 +76,9 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
/** */
private int rightIdx;
+ /** */
+ private int processed;
+
/** */
private Row rightEmptyRow;
@@ -129,8 +131,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ",
requested=" + requested;
- checkState();
-
requested = rowsCnt;
onRequest();
@@ -196,8 +196,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
assert downstream() != null;
assert waitingLeft > 0;
- checkState();
-
waitingLeft--;
if (leftInBuf == null)
@@ -213,8 +211,6 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
assert downstream() != null;
assert waitingRight > 0;
- checkState();
-
waitingRight--;
if (rightInBuf == null)
@@ -269,9 +265,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
assert F.isEmpty(rightInBuf);
context().execute(() -> {
- checkState();
-
state = State.FILLING_LEFT;
+
leftSource().request(waitingLeft = leftInBufferSize);
}, this::onError);
@@ -282,11 +277,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
assert waitingRight == -1 || waitingRight == 0 &&
rightInBuf.size() == rightInBufferSize;
assert waitingLeft == -1 || waitingLeft == 0 &&
leftInBuf.size() == leftInBufferSize;
- context().execute(() -> {
- checkState();
-
- join();
- }, this::onError);
+ context().execute(this::join0, this::onError);
break;
@@ -308,6 +299,8 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
if (leftInBuf.size() == leftInBufferSize) {
assert waitingLeft == 0;
+ checkState();
+
prepareCorrelations();
if (waitingRight == -1)
@@ -384,7 +377,11 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
leftIdx = 0;
while (requested > 0 && leftIdx < leftInBuf.size()) {
- checkState();
+ if (processed++ > IN_BUFFER_SIZE) {
+ context().execute(this::join0, this::onError);
+
+ return;
+ }
Row left = leftInBuf.get(leftIdx);
Row right = rightInBuf.get(rightIdx);
@@ -434,6 +431,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
try {
while (requested > 0 && notMatchedIdx < leftInBuf.size()) {
+ processed++;
requested--;
downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow));
@@ -500,4 +498,13 @@ public class CorrelatedNestedLoopJoinNode<Row> extends
AbstractNode<Row> {
context().setCorrelated(row, correlationIds.get(i).getId());
}
}
+
+ /** */
+ private void join0() throws Exception {
+ checkState();
+
+ processed = 0;
+
+ join();
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
index f845fb21d95..c3731b26037 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java
@@ -198,8 +198,11 @@ public abstract class HashJoinNode<Row> extends
AbstractRightMaterializedJoinNod
hashStore.computeIfAbsent(key, k -> createRowList()).add(row);
}
- if (waitingRight == 0)
+ if (waitingRight == 0) {
+ checkState();
+
rightSource().request(waitingRight = IN_BUFFER_SIZE);
+ }
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index 1f3f97e40d7..1c4d902c596 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -77,6 +77,9 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** */
protected boolean inLoop;
+ /** */
+ private int processed;
+
/**
* Flag indicating that at least one of the inputs has exchange
underneath. In this case we can't prematurely end
* downstream if one of the inputs is drained, we need to wait for both
inputs, since async message from remote
@@ -107,24 +110,16 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;
- checkState();
-
requested = rowsCnt;
if (!inLoop)
- context().execute(this::doJoin, this::onError);
- }
-
- /** */
- private void doJoin() throws Exception {
- checkState();
-
- join();
+ context().execute(this::join0, this::onError);
}
/** {@inheritDoc} */
@Override protected void rewindInternal() {
requested = 0;
+ processed = 0;
waitingLeft = 0;
waitingRight = 0;
@@ -184,8 +179,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
assert downstream() != null;
assert waitingLeft > 0;
- checkState();
-
waitingLeft--;
if (!finishing)
@@ -199,8 +192,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
assert downstream() != null;
assert waitingRight > 0;
- checkState();
-
waitingRight--;
if (!finishing)
@@ -214,8 +205,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
assert downstream() != null;
assert waitingLeft > 0;
- checkState();
-
waitingLeft = NOT_WAITING;
join();
@@ -226,8 +215,6 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
assert downstream() != null;
assert waitingRight > 0;
- checkState();
-
waitingRight = NOT_WAITING;
join();
@@ -339,7 +326,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())
&& (right != null || !rightInBuf.isEmpty()
|| rightMaterialization != null)) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (left == null)
left = leftInBuf.remove();
@@ -469,7 +457,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())
&& (right != null || !rightInBuf.isEmpty()
|| rightMaterialization != null || waitingRight ==
NOT_WAITING)) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (left == null) {
left = leftInBuf.remove();
@@ -622,7 +611,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
try {
while (requested > 0 && !(left == null && leftInBuf.isEmpty()
&& waitingLeft != NOT_WAITING)
&& (right != null || !rightInBuf.isEmpty() ||
rightMaterialization != null)) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (left == null && !leftInBuf.isEmpty())
left = leftInBuf.remove();
@@ -796,7 +786,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
try {
while (requested > 0 && !(left == null && leftInBuf.isEmpty()
&& waitingLeft != NOT_WAITING)
&& !(right == null && rightInBuf.isEmpty() &&
rightMaterialization == null && waitingRight != NOT_WAITING)) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (left == null && !leftInBuf.isEmpty()) {
left = leftInBuf.remove();
@@ -975,7 +966,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())
&& (right != null || !rightInBuf.isEmpty())) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (left == null)
left = leftInBuf.remove();
@@ -1031,7 +1023,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())
&&
!(right == null && rightInBuf.isEmpty() && waitingRight !=
NOT_WAITING)) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (left == null)
left = leftInBuf.remove();
@@ -1070,4 +1063,24 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
tryToRequestInputs();
}
}
+
+ /** */
+ private void join0() throws Exception {
+ checkState();
+
+ processed = 0;
+
+ join();
+ }
+
+ /** */
+ protected boolean rescheduleJoin() {
+ if (processed++ > IN_BUFFER_SIZE) {
+ context().execute(this::join0, this::onError);
+
+ return true;
+ }
+
+ return false;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index f2f42e3b68d..51b6d2b8518 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -66,16 +66,17 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
assert downstream() != null;
assert waitingRight > 0;
- checkState();
-
nodeMemoryTracker.onRowAdded(row);
waitingRight--;
rightMaterialized.add(row);
- if (waitingRight == 0)
+ if (waitingRight == 0) {
+ checkState();
+
rightSource().request(waitingRight = IN_BUFFER_SIZE);
+ }
}
/** */
@@ -138,7 +139,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
left = leftInBuf.remove();
while (requested > 0 && rightIdx <
rightMaterialized.size()) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (!cond.test(left,
rightMaterialized.get(rightIdx++)))
continue;
@@ -206,7 +208,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
}
while (requested > 0 && rightIdx <
rightMaterialized.size()) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (!cond.test(left,
rightMaterialized.get(rightIdx++)))
continue;
@@ -294,7 +297,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
left = leftInBuf.remove();
while (requested > 0 && rightIdx <
rightMaterialized.size()) {
- checkState();
+ if (rescheduleJoin())
+ return;
Row right = rightMaterialized.get(rightIdx++);
@@ -323,16 +327,16 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
assert lastPushedInd >= 0;
inLoop = true;
+ lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd);
+
try {
- for (lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd);;
- lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1)
- ) {
- checkState();
+ Row emptyLeft = lastPushedInd < 0 ? null :
leftRowFactory.create();
- if (lastPushedInd < 0)
- break;
+ while (lastPushedInd >= 0) {
+ if (rescheduleJoin())
+ return;
- Row row = rowHnd.concat(leftRowFactory.create(),
rightMaterialized.get(lastPushedInd));
+ Row row = rowHnd.concat(emptyLeft,
rightMaterialized.get(lastPushedInd));
rightNotMatchedIndexes.clear(lastPushedInd);
@@ -341,6 +345,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
if (lastPushedInd == Integer.MAX_VALUE || requested <=
0)
break;
+
+ lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1);
}
}
finally {
@@ -415,7 +421,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
}
while (requested > 0 && rightIdx <
rightMaterialized.size()) {
- checkState();
+ if (rescheduleJoin())
+ return;
Row right = rightMaterialized.get(rightIdx++);
@@ -456,16 +463,16 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
assert lastPushedInd >= 0;
inLoop = true;
+ lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd);
+
try {
- for (lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd);;
- lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1)
- ) {
- checkState();
+ Row emptyLeft = lastPushedInd < 0 ? null :
leftRowFactory.create();
- if (lastPushedInd < 0)
- break;
+ while (lastPushedInd >= 0) {
+ if (rescheduleJoin())
+ return;
- Row row = rowHnd.concat(leftRowFactory.create(),
rightMaterialized.get(lastPushedInd));
+ Row row = rowHnd.concat(emptyLeft,
rightMaterialized.get(lastPushedInd));
rightNotMatchedIndexes.clear(lastPushedInd);
@@ -474,6 +481,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
if (lastPushedInd == Integer.MAX_VALUE || requested <=
0)
break;
+
+ lastPushedInd =
rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1);
}
}
finally {
@@ -505,7 +514,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
boolean matched = false;
while (!matched && requested > 0 && rightIdx <
rightMaterialized.size()) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (!cond.test(left,
rightMaterialized.get(rightIdx++)))
continue;
@@ -549,7 +559,8 @@ public abstract class NestedLoopJoinNode<Row> extends
AbstractRightMaterializedJ
boolean matched = false;
while (!matched && rightIdx <
rightMaterialized.size()) {
- checkState();
+ if (rescheduleJoin())
+ return;
if (cond.test(left,
rightMaterialized.get(rightIdx++)))
matched = true;