This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-24675
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit f6840b903fd1fb07b95a8da3f11e987aecfebb84
Author: amashenkov <[email protected]>
AuthorDate: Fri Feb 28 16:48:59 2025 +0300

    Refactoring. Make execution nodes buffer size configurable.
---
 .../internal/sql/engine/exec/ExecutionContext.java |  15 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |   6 +-
 .../internal/sql/engine/exec/rel/AbstractNode.java |   3 +-
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   3 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |  13 +-
 .../engine/exec/rel/AbstractJoinExecutionTest.java |   2 +-
 .../sql/engine/exec/rel/HashJoinExecutionTest.java | 217 +++++++++++++++------
 .../sql/engine/framework/TestBuilders.java         |   3 +-
 8 files changed, 196 insertions(+), 66 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 61002d13a71..44a92f027f4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningCol
 import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteCheckedException;
@@ -65,6 +66,8 @@ public class ExecutionContext<RowT> implements DataContext {
      */
     private static final Locale LOCALE = Locale.ENGLISH;
 
+    private final int inBufSize;
+
     private final QueryTaskExecutor executor;
 
     private final ExecutionId executionId;
@@ -108,6 +111,7 @@ public class ExecutionContext<RowT> implements DataContext {
      * @param params Parameters.
      * @param txAttributes Transaction attributes.
      * @param timeZoneId Session time-zone ID.
+     * @param inBufSize Default execution nodes' internal buffer size. 
Negative value means default value.
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public ExecutionContext(
@@ -120,7 +124,8 @@ public class ExecutionContext<RowT> implements DataContext {
             RowHandler<RowT> handler,
             Map<String, Object> params,
             TxAttributes txAttributes,
-            ZoneId timeZoneId
+            ZoneId timeZoneId,
+            int inBufSize
     ) {
         this.expressionFactory = expressionFactory;
         this.executor = executor;
@@ -132,6 +137,9 @@ public class ExecutionContext<RowT> implements DataContext {
         this.originatingNodeName = originatingNodeName;
         this.txAttributes = txAttributes;
         this.timeZoneId = timeZoneId;
+        this.inBufSize = inBufSize < 0 ? Commons.IN_BUFFER_SIZE : inBufSize;
+
+        assert this.inBufSize > 0 : this.inBufSize;
 
         Instant nowUtc = Instant.now();
         startTs = 
nowUtc.plusSeconds(this.timeZoneId.getRules().getOffset(nowUtc).getTotalSeconds()).toEpochMilli();
@@ -223,6 +231,11 @@ public class ExecutionContext<RowT> implements DataContext 
{
         return localNode;
     }
 
+    /** Default internal buffer size. */
+    public int defaultBufferSize() {
+        return inBufSize;
+    }
+
     /** {@inheritDoc} */
     @Override
     public SchemaPlus getRootSchema() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 52e906d2781..dfce79cbef7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -447,7 +447,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 handler,
                 Commons.parametersMap(operationContext.parameters()),
                 TxAttributes.dummy(),
-                operationContext.timeZoneId()
+                operationContext.timeZoneId(),
+                -1
         );
 
         QueryTransactionContext txContext = operationContext.txContext();
@@ -935,7 +936,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     handler,
                     Commons.parametersMap(ctx.parameters()),
                     txAttributes,
-                    ctx.timeZoneId()
+                    ctx.timeZoneId(),
+                    -1
             );
         }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index 3b84aef7d37..5d233dae839 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -37,7 +37,7 @@ public abstract class AbstractNode<RowT> implements 
Node<RowT> {
 
     protected static final int IO_BATCH_CNT = Commons.IO_BATCH_COUNT;
 
-    protected final int inBufSize = Commons.IN_BUFFER_SIZE;
+    protected final int inBufSize;
 
     private final ExecutionContext<RowT> ctx;
 
@@ -58,6 +58,7 @@ public abstract class AbstractNode<RowT> implements 
Node<RowT> {
      */
     protected AbstractNode(ExecutionContext<RowT> ctx) {
         this.ctx = ctx;
+        this.inBufSize = ctx.defaultBufferSize();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 1766805a514..30b480ede6c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -125,7 +125,8 @@ public class RuntimeSortedIndexTest extends 
IgniteAbstractTest {
                         ArrayRowHandler.INSTANCE,
                         Map.of(),
                         null,
-                        SqlQueryProcessor.DEFAULT_TIME_ZONE_ID
+                        SqlQueryProcessor.DEFAULT_TIME_ZONE_ID,
+                        -1
                 ),
                 RelCollations.of(ImmutableIntList.copyOf(idxCols)),
                 (o1, o2) -> {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index b541046341d..66b3bdb6f5e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -92,10 +92,18 @@ public abstract class AbstractExecutionTest<T> extends 
IgniteAbstractTest {
     protected abstract RowHandler<T> rowHandler();
 
     protected ExecutionContext<T> executionContext() {
-        return executionContext(false);
+        return executionContext(-1, false);
+    }
+
+    protected ExecutionContext<T> executionContext(int defaultBufferSize) {
+        return executionContext(defaultBufferSize, false);
     }
 
     protected ExecutionContext<T> executionContext(boolean withDelays) {
+        return executionContext(-1, withDelays);
+    }
+
+    protected ExecutionContext<T> executionContext(int defaultBufferSize, 
boolean withDelays) {
         if (withDelays) {
             StripedThreadPoolExecutor testExecutor = new 
IgniteTestStripedThreadPoolExecutor(8,
                     NamedThreadFactory.create("fake-test-node", "sqlTestExec", 
log),
@@ -130,7 +138,8 @@ public abstract class AbstractExecutionTest<T> extends 
IgniteAbstractTest {
                 rowHandler(),
                 Map.of(),
                 TxAttributes.fromTx(new NoOpTransaction("fake-test-node", 
false)),
-                SqlQueryProcessor.DEFAULT_TIME_ZONE_ID
+                SqlQueryProcessor.DEFAULT_TIME_ZONE_ID,
+                -1
         );
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
index 18c9d6c280d..63288a804ea 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java
@@ -396,7 +396,7 @@ public abstract class AbstractJoinExecutionTest extends 
AbstractExecutionTest<Ob
      * @param expRes   Expected result.
      */
     private void verifyJoin(Object[][] left, Object[][] right, JoinRelType 
joinType, Object[][] expRes, JoinAlgo algo) {
-        ExecutionContext<Object[]> ctx = executionContext(true);
+        ExecutionContext<Object[]> ctx = executionContext();
 
         IgniteTypeFactory tf = ctx.getTypeFactory();
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
index ba85d926177..3c031d39796 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashJoinExecutionTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
+import static org.apache.calcite.rel.core.JoinRelType.ANTI;
+import static org.apache.calcite.rel.core.JoinRelType.FULL;
 import static org.apache.calcite.rel.core.JoinRelType.INNER;
 import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
 import static org.apache.calcite.rel.core.JoinRelType.SEMI;
@@ -28,18 +30,30 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.function.BiPredicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /** Hash join execution tests. */
 @SuppressWarnings("resource")
 public class HashJoinExecutionTest extends AbstractJoinExecutionTest {
+
+    public static final int DEFAULT_BUFFER_SIZE = Commons.IN_BUFFER_SIZE;
+
     @Override
     JoinAlgo joinAlgo() {
         return JoinAlgo.HASH;
@@ -47,7 +61,7 @@ public class HashJoinExecutionTest extends 
AbstractJoinExecutionTest {
 
     @Test
     public void testHashJoinRewind() {
-        ExecutionContext<Object[]> ctx = executionContext(true);
+        ExecutionContext<Object[]> ctx = executionContext();
 
         ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
                 new Object[]{0, "Igor", 1},
@@ -62,16 +76,7 @@ public class HashJoinExecutionTest extends 
AbstractJoinExecutionTest {
                 new Object[]{3, "QA"}
         ));
 
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-
-        RelDataType outType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, 
NativeTypes.STRING, NativeTypes.INT32));
-        RelDataType leftType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-        RelDataType rightType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-
-        AbstractRightMaterializedJoinNode<Object[]> join = 
HashJoinNode.create(ctx, outType, leftType, rightType, RIGHT,
-                JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(2)), 
null);
+        HashJoinNode<Object[]> join = createJoinNode(ctx, RIGHT, null);
 
         join.register(asList(deps, persons));
 
@@ -131,78 +136,161 @@ public class HashJoinExecutionTest extends 
AbstractJoinExecutionTest {
 
     @Test
     void innerHashJoinWithPostFiltration() {
-        ExecutionContext<Object[]> ctx = executionContext(true);
-
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        Object[][] persons = {
                 new Object[]{0, "Igor", 1},
                 new Object[]{1, "Roman", 2},
                 new Object[]{2, "Ivan", 5},
                 new Object[]{3, "Alexey", 1}
-        ));
+        };
 
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+        Object[][] deps = {
                 new Object[]{1, "Core"},
                 new Object[]{2, "SQL"},
                 new Object[]{3, "QA"}
-        ));
-
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-
-        RelDataType outType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32, 
NativeTypes.STRING, NativeTypes.INT32));
-        RelDataType leftType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-        RelDataType rightType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
-
-        AbstractRightMaterializedJoinNode<Object[]> join = 
HashJoinNode.create(ctx, outType, leftType, rightType, INNER,
-                JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(2)), 
(l, r) -> ((String) l[1]).length() > 3);
-
-        join.register(asList(deps, persons));
-
-        RootNode<Object[]> node = new RootNode<>(ctx);
-        node.register(join);
-
-        ArrayList<Object[]> rows = new ArrayList<>();
-
-        while (node.hasNext()) {
-            rows.add(node.next());
-        }
+        };
 
         Object[][] expected = {
                 {1, "Core", 0, "Igor", 1},
                 {1, "Core", 3, "Alexey", 1},
         };
 
-        assert2DimArrayEquals(expected, rows);
+        BiPredicate<Object[], Object[]> condition = (l, r) -> ((String) 
l[1]).length() > 3;
+
+        validate(INNER, condition, Stream.of(deps)::iterator, 
Stream.of(persons)::iterator, expected);
     }
 
     @Test
     void semiHashJoinWithPostFiltration() {
-        ExecutionContext<Object[]> ctx = executionContext(true);
-
-        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+        Object[][] persons = {
                 new Object[]{0, "Igor", 1},
                 new Object[]{1, "Roman", 2},
                 new Object[]{2, "Ivan", 5},
                 new Object[]{3, "Alexey", 1}
-        ));
+        };
 
-        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+        Object[][] deps = {
                 new Object[]{1, "Core"},
                 new Object[]{2, "SQL"},
                 new Object[]{3, "QA"}
-        ));
+        };
 
-        IgniteTypeFactory tf = ctx.getTypeFactory();
+        Object[][] expected = {
+                {1, "Core"}
+        };
 
-        RelDataType leftType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
-        RelDataType rightType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf,
-                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+        BiPredicate<Object[], Object[]> condition = (l, r) -> ((String) 
l[1]).length() > 3;
 
-        AbstractRightMaterializedJoinNode<Object[]> join = 
HashJoinNode.create(ctx, leftType, leftType, rightType, SEMI,
-                JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(2)), 
(l, r) -> ((String) l[1]).length() > 3);
+        validate(SEMI, condition, Stream.of(deps)::iterator, 
Stream.of(persons)::iterator, expected);
+    }
 
-        join.register(asList(deps, persons));
+    @ParameterizedTest
+    @EnumSource(JoinRelType.class)
+    void checkHashJoinNodeWithDifferentBufferSize(JoinRelType joinType) {
+        Assumptions.assumeFalse(joinType == RIGHT, "RIGHT join type is buggy");
+        Assumptions.assumeFalse(joinType == FULL, "FULL join type is buggy");
+        Assumptions.assumeFalse(joinType == ANTI, "ANTI join type is buggy");
+
+        validateDistinctData(executionContext(1), joinType, 0, 0);
+        validateDistinctData(executionContext(1), joinType, 0, 1);
+        validateDistinctData(executionContext(1), joinType, 0, 10);
+        validateDistinctData(executionContext(1), joinType, 1, 0);
+        validateDistinctData(executionContext(1), joinType, 1, 10);
+        validateDistinctData(executionContext(1), joinType, 10, 0);
+        validateDistinctData(executionContext(1), joinType, 10, 1);
+        validateDistinctData(executionContext(1), joinType, 10, 10);
+
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
0, 0);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
0, DEFAULT_BUFFER_SIZE - 1);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
0, DEFAULT_BUFFER_SIZE);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
0, DEFAULT_BUFFER_SIZE + 1);
+
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE - 1, 0);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE - 1, DEFAULT_BUFFER_SIZE - 1);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE - 1, DEFAULT_BUFFER_SIZE);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE - 1, DEFAULT_BUFFER_SIZE + 1);
+
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE, 0);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE - 1);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE + 1);
+
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE + 1, 0);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE + 1, DEFAULT_BUFFER_SIZE - 1);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE + 1, DEFAULT_BUFFER_SIZE);
+        validateDistinctData(executionContext(DEFAULT_BUFFER_SIZE), joinType, 
DEFAULT_BUFFER_SIZE + 1, DEFAULT_BUFFER_SIZE + 1);
+    }
+
+    private static void validateDistinctData(
+            ExecutionContext<Object[]> ctx,
+            JoinRelType joinType,
+            int leftSize,
+            int rightSize
+    ) {
+        int resultSize = estimateResultSizeForDistinctInputs(joinType, 
leftSize, rightSize);
+
+        Object[] department = {1, "department"};
+        Object[] person = {1, "name", 2};
+        Iterable<Object[]> leftSource = IntStream.range(0, 
leftSize).mapToObj(i -> department)::iterator;
+        Iterable<Object[]> rightSource = IntStream.range(0, 
rightSize).mapToObj(i -> person)::iterator;
+
+        ScanNode<Object[]> left = new ScanNode<>(ctx, leftSource);
+        ScanNode<Object[]> right = new ScanNode<>(ctx, rightSource);
+
+        HashJoinNode<Object[]> join = createJoinNode(ctx, joinType, null);
+
+        join.register(asList(left, right));
+
+        RootNode<Object[]> node = new RootNode<>(ctx);
+        node.register(join);
+
+        int count = 0;
+        while (node.hasNext()) {
+            node.next();
+            count++;
+        }
+
+        assertEquals(resultSize, count);
+    }
+
+    private static int estimateResultSizeForDistinctInputs(
+            JoinRelType joinType,
+            int leftSize,
+            int rightSize
+    ) {
+        switch (joinType) {
+            case SEMI: // Fallthrough
+            case INNER:
+                return 0;
+            case ANTI: // Fallthrough
+            case LEFT:
+                return leftSize;
+            case RIGHT:
+                return rightSize;
+            case FULL:
+                return leftSize + rightSize;
+            case ASOF:
+            case LEFT_ASOF:
+                return Assumptions.abort("Unsupported join type: " + joinType);
+            default:
+                throw new IllegalArgumentException("Unsupported join type: " + 
joinType);
+        }
+    }
+
+    private void validate(
+            JoinRelType joinType,
+            @Nullable BiPredicate<Object[], Object[]> condition,
+            Iterable<Object[]> leftSource,
+            Iterable<Object[]> rightSource,
+            Object[][] expected
+    ) {
+        ExecutionContext<Object[]> ctx = executionContext(true);
+
+        ScanNode<Object[]> left = new ScanNode<>(ctx, leftSource);
+        ScanNode<Object[]> right = new ScanNode<>(ctx, rightSource);
+
+        HashJoinNode<Object[]> join = createJoinNode(ctx, joinType, condition);
+
+        join.register(asList(left, right));
 
         RootNode<Object[]> node = new RootNode<>(ctx);
         node.register(join);
@@ -213,13 +301,28 @@ public class HashJoinExecutionTest extends 
AbstractJoinExecutionTest {
             rows.add(node.next());
         }
 
-        Object[][] expected = {
-                {1, "Core"}
-        };
-
         assert2DimArrayEquals(expected, rows);
     }
 
+    private static HashJoinNode<Object[]> createJoinNode(
+            ExecutionContext<Object[]> ctx,
+            JoinRelType joinType,
+            @Nullable BiPredicate<Object[], Object[]> condition
+    ) {
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+
+        RelDataType leftType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
+        RelDataType rightType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf,
+                NativeTypes.INT32, NativeTypes.STRING, NativeTypes.INT32));
+
+        RelDataType outType = (joinType == ANTI || joinType == SEMI)
+                ? leftType
+                : TypeUtils.combinedRowType(tf, leftType, rightType);
+
+        return HashJoinNode.create(ctx, outType, leftType, rightType, joinType,
+                JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(2)), 
condition);
+    }
+
     private static void assert2DimArrayEquals(Object[][] expected, 
ArrayList<Object[]> actual) {
         assertEquals(expected.length, actual.size(), "expected length: " + 
expected.length + ", actual length: " + actual.size());
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 3febb9de83e..ea16bf2ecd4 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -617,7 +617,8 @@ public class TestBuilders {
                     ArrayRowHandler.INSTANCE,
                     Commons.parametersMap(dynamicParams),
                     TxAttributes.fromTx(new NoOpTransaction(node.name(), 
false)),
-                    SqlQueryProcessor.DEFAULT_TIME_ZONE_ID
+                    SqlQueryProcessor.DEFAULT_TIME_ZONE_ID,
+                    -1
             );
         }
     }

Reply via email to