This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-24675 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 601bdddc349c8472bf15b764d47163642cd38ad9 Author: amashenkov <[email protected]> AuthorDate: Fri Feb 28 16:50:50 2025 +0300 Fix HashJoin node hanging for RIGHT and FULL OUTER joins --- .../internal/sql/engine/exec/rel/HashJoinNode.java | 47 +++++++++++++++++++--- .../sql/engine/exec/rel/HashJoinExecutionTest.java | 2 - 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java index bf8c88b1042..0e26a93e55c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinNode.java @@ -81,8 +81,8 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo assert leftJoinPositions.length == rightJoinPositions.length; this.outputRowFactory = outputRowFactory; - this.nonEquiCondition = nonEquiCondition != null - ? nonEquiCondition + this.nonEquiCondition = nonEquiCondition != null + ? nonEquiCondition : cast(ALWAYS_TRUE); } @@ -161,6 +161,7 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo inLoop = true; try { while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + // Proceed with next left row, if previous was fully processed. if (!rightIt.hasNext()) { left = leftInBuf.remove(); @@ -170,6 +171,7 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo } if (rightIt.hasNext()) { + // Emits matched rows. while (rightIt.hasNext()) { checkState(); @@ -238,11 +240,13 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { checkState(); + // Proceed with next left row, if previous was fully processed. if (!rightIt.hasNext()) { left = leftInBuf.remove(); Collection<RowT> rightRows = lookup(left); + // Emit unmatched left row. if (rightRows.isEmpty()) { requested--; downstream().push(outputRowFactory.concat(left, rightRowFactory.create())); @@ -252,6 +256,7 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo } if (rightIt.hasNext()) { + // Emits matched rows. while (rightIt.hasNext()) { checkState(); @@ -285,6 +290,8 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo /** Left row factory. */ private final RowHandler.RowFactory<RowT> leftRowFactory; + private boolean drainMaterialization; + /** * Creates HashJoinNode for RIGHT OUTER JOIN operator. * @@ -307,6 +314,14 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo this.leftRowFactory = leftRowFactory; } + /** {@inheritDoc} */ + @Override + protected void rewindInternal() { + drainMaterialization = false; + + super.rewindInternal(); + } + @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { @@ -315,6 +330,7 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { checkState(); + // Proceed with next left row, if previous was fully processed. if (!rightIt.hasNext()) { left = leftInBuf.remove(); @@ -324,6 +340,7 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo } if (rightIt.hasNext()) { + // Emits matched rows. while (rightIt.hasNext()) { checkState(); @@ -349,10 +366,13 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo } } + // Emit unmatched right rows. if (left == null && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) { inLoop = true; try { - if (!rightIt.hasNext()) { + if (!rightIt.hasNext() && !drainMaterialization) { + // Prevent scanning store more than once. + drainMaterialization = true; rightIt = getUntouched(hashStore); } @@ -391,6 +411,8 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo /** Right row factory. */ private final RowHandler.RowFactory<RowT> rightRowFactory; + private boolean drainMaterialization; + /** * Creates HashJoinNode for FULL OUTER JOIN operator. * @@ -416,6 +438,14 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo this.rightRowFactory = rightRowFactory; } + /** {@inheritDoc} */ + @Override + protected void rewindInternal() { + drainMaterialization = false; + + super.rewindInternal(); + } + /** {@inheritDoc} */ @Override protected void join() throws Exception { @@ -425,11 +455,13 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { checkState(); + // Proceed with next left row, if previous was fully processed. if (!rightIt.hasNext()) { left = leftInBuf.remove(); Collection<RowT> rightRows = lookup(left); + // Emit unmatched left row. if (rightRows.isEmpty()) { requested--; downstream().push(outputRowFactory.concat(left, rightRowFactory.create())); @@ -439,6 +471,7 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo } if (rightIt.hasNext()) { + // Emits matched rows. while (rightIt.hasNext()) { checkState(); @@ -464,11 +497,13 @@ public abstract class HashJoinNode<RowT> extends AbstractRightMaterializedJoinNo } } - if (left == null && !rightIt.hasNext() && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING - && waitingRight == NOT_WAITING && requested > 0) { + // Emit unmatched right rows. + if (left == null && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) { inLoop = true; try { - if (!rightIt.hasNext()) { + if (!rightIt.hasNext() && !drainMaterialization) { + // Prevent scanning store more than once. + drainMaterialization = true; rightIt = getUntouched(hashStore); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java index 3c031d39796..6ea0c535c74 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java @@ -186,8 +186,6 @@ public class HashJoinExecutionTest extends AbstractJoinExecutionTest { @ParameterizedTest @EnumSource(JoinRelType.class) void checkHashJoinNodeWithDifferentBufferSize(JoinRelType joinType) { - Assumptions.assumeFalse(joinType == RIGHT, "RIGHT join type is buggy"); - Assumptions.assumeFalse(joinType == FULL, "FULL join type is buggy"); Assumptions.assumeFalse(joinType == ANTI, "ANTI join type is buggy"); validateDistinctData(executionContext(1), joinType, 0, 0);
