This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3438c301536 IGNITE-26968 Sql. Left join of 3 tables produces incorrect 
result (#6945)
3438c301536 is described below

commit 3438c301536e2a179efe05185ee8ad878cc4e2c8
Author: korlov42 <[email protected]>
AuthorDate: Fri Nov 14 13:11:51 2025 +0200

    IGNITE-26968 Sql. Left join of 3 tables produces incorrect result (#6945)
---
 .../apache/ignite/jdbc/ItJdbcJoinsSelfTest.java    |   5 +
 .../sql/engine/rel/AbstractIgniteJoin.java         |  95 ++++++----
 .../internal/sql/engine/rel/IgniteProject.java     |  12 +-
 .../internal/sql/engine/trait/TraitUtils.java      |  16 --
 .../ignite/internal/sql/engine/util/Commons.java   |  11 ++
 .../planner/IdentityDistributionPlannerTest.java   |  21 ++-
 .../engine/planner/JoinColocationPlannerTest.java  | 194 ++++++++++++++++++++-
 .../internal/sql/engine/util/CommonsTest.java      |  22 +++
 .../src/test/resources/tpch/plan/q13.plan          |  46 ++---
 9 files changed, 345 insertions(+), 77 deletions(-)

diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
index 5ae13251f63..ee9d5366b68 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
@@ -29,6 +29,11 @@ import org.junit.jupiter.api.Test;
  * Tests for complex queries with joins.
  */
 public class ItJdbcJoinsSelfTest extends AbstractJdbcSelfTest {
+    @Override
+    protected int initialNodes() {
+        return 3;
+    }
+
     /**
      * Check distributed OUTER join of 3 tables (T1 -> T2 -> T3) returns 
correct result for non-collocated data.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
index 7f672db4d50..c3a0485d133 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
@@ -50,8 +50,8 @@ import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.mapping.IntPair;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
 import org.apache.ignite.internal.sql.engine.rel.explain.IgniteRelWriter;
-import org.apache.ignite.internal.sql.engine.trait.DistributionFunction;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -117,30 +117,22 @@ public abstract class AbstractIgniteJoin extends Join 
implements TraitsAwareIgni
     /** {@inheritDoc} */
     @Override
     public List<Pair<RelTraitSet, List<RelTraitSet>>> 
deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // There are several rules:
-        // 1) any join is possible on broadcast or single distribution
-        // 2) hash distributed join is possible when join keys are superset of 
source distribution keys
-        // 3) hash and broadcast distributed tables can be joined when join 
keys equal to hash
-        //    distributed table distribution keys and:
-        //      3.1) it's a left join and a hash distributed table is at left
-        //      3.2) it's a right join and a hash distributed table is at right
-        //      3.3) it's an inner join, this case a hash distributed table 
may be at any side
-
         RelTraitSet left = inputTraits.get(0);
         RelTraitSet right = inputTraits.get(1);
 
         List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
 
-        final IgniteDistribution leftDistr = TraitUtils.distribution(left);
-        final IgniteDistribution rightDistr = TraitUtils.distribution(right);
+        IgniteDistribution leftDistr = TraitUtils.distribution(left);
+        IgniteDistribution rightDistr = TraitUtils.distribution(right);
 
-        final IgniteDistribution left2rightProjectedDistr = 
leftDistr.apply(buildTransposeMapping(true));
-        final IgniteDistribution right2leftProjectedDistr = 
rightDistr.apply(buildTransposeMapping(false));
+        IgniteDistribution left2rightProjectedDistr = 
leftDistr.apply(buildTransposeMapping(true));
+        IgniteDistribution right2leftProjectedDistr = 
rightDistr.apply(buildTransposeMapping(false));
 
         RelTraitSet outTraits;
         RelTraitSet leftTraits;
         RelTraitSet rightTraits;
 
+        // Any join is possible with single and broadcast distributions.
         if (leftDistr == broadcast() && rightDistr == broadcast()) {
             outTraits = nodeTraits.replace(broadcast());
             leftTraits = left.replace(broadcast());
@@ -157,32 +149,75 @@ public abstract class AbstractIgniteJoin extends Join 
implements TraitsAwareIgni
             return List.copyOf(res);
         }
 
-        if (leftDistr.getType() == HASH_DISTRIBUTED && 
left2rightProjectedDistr != random()) {
-            outTraits = nodeTraits.replace(leftDistr);
-            leftTraits = left.replace(leftDistr);
-            rightTraits = right.replace(left2rightProjectedDistr);
+        TargetMapping offsetByLeftSize = Commons.targetOffsetMapping(
+                this.right.getRowType().getFieldCount(),
+                this.left.getRowType().getFieldCount()
+        );
 
-            res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+        // Re-hashing of right side to left.
+        if (leftDistr.getType() == HASH_DISTRIBUTED && 
left2rightProjectedDistr != random()) {
+            computeHashOutputOptions(nodeTraits, left, leftDistr, right, 
left2rightProjectedDistr, res, offsetByLeftSize);
         }
 
+        // Re-hashing of left side to right.
         if (rightDistr.getType() == HASH_DISTRIBUTED && 
right2leftProjectedDistr != random()) {
-            outTraits = nodeTraits.replace(rightDistr);
-            leftTraits = left.replace(right2leftProjectedDistr);
-            rightTraits = right.replace(rightDistr);
+            computeHashOutputOptions(nodeTraits, left, 
right2leftProjectedDistr, right, rightDistr, res, offsetByLeftSize);
+        }
 
+        // Full re-hashing by join keys. 
+        computeHashOutputOptions(
+                nodeTraits, left, hash(joinInfo.leftKeys), right, 
hash(joinInfo.rightKeys), res, offsetByLeftSize
+        );
+
+        return List.copyOf(res);
+    }
+
+    private void computeHashOutputOptions(
+            RelTraitSet nodeCurrent,
+            RelTraitSet leftCurrent,
+            IgniteDistribution newLeftDistribution,
+            RelTraitSet rightCurrent,
+            IgniteDistribution newRightDistribution,
+            List<Pair<RelTraitSet, List<RelTraitSet>>> res,
+            TargetMapping offsetByLeftSize
+    ) {
+        if (newLeftDistribution.getType() != HASH_DISTRIBUTED 
+                || newRightDistribution.getType() != HASH_DISTRIBUTED) {
+            // Throw this error explicitly, otherwise we might get incorrect 
plan which
+            // emits wrong result.
+            throw new AssertionError("Only hash-based distribution is 
allowed");
+        }
+
+        RelTraitSet leftTraits = leftCurrent.replace(newLeftDistribution);
+        RelTraitSet rightTraits = rightCurrent.replace(newRightDistribution);
+
+        RelTraitSet outTraits;
+
+        // Depending on the type of the join, we may produce output 
distribution based
+        // on left keys, right keys, or both.
+        if (shouldEmitLeftSideDistribution(joinType)) {
+            outTraits = nodeCurrent.replace(newLeftDistribution);
             res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
         }
 
-        leftTraits = left.replace(hash(joinInfo.leftKeys, 
DistributionFunction.hash()));
-        rightTraits = right.replace(hash(joinInfo.rightKeys, 
DistributionFunction.hash()));
+        if (shouldEmitRightSideDistribution(joinType)) {
+            outTraits = 
nodeCurrent.replace(newRightDistribution.apply(offsetByLeftSize));
+            res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+        }
 
-        outTraits = nodeTraits.replace(hash(joinInfo.leftKeys, 
DistributionFunction.hash()));
-        res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+        // If we can emit anything meaningful, then emit output with random 
distribution.
+        if (!shouldEmitLeftSideDistribution(joinType) && 
!shouldEmitRightSideDistribution(joinType)) {
+            outTraits = nodeCurrent.replace(random());
+            res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+        }
+    }
 
-        outTraits = nodeTraits.replace(hash(joinInfo.rightKeys, 
DistributionFunction.hash()));
-        res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+    private static boolean shouldEmitLeftSideDistribution(JoinRelType 
joinType) {
+        return !joinType.generatesNullsOnLeft();
+    }
 
-        return List.copyOf(res);
+    private static boolean shouldEmitRightSideDistribution(JoinRelType 
joinType) {
+        return joinType.projectsRight() && !joinType.generatesNullsOnRight();
     }
 
     /** {@inheritDoc} */
@@ -252,7 +287,7 @@ public abstract class AbstractIgniteJoin extends Join 
implements TraitsAwareIgni
                 }
 
                 // We cannot provide random distribution without unique 
constraint on join keys,
-                // so, we require hash distribution (wich satisfies random 
distribution) instead.
+                // so, we require hash distribution (which satisfies random 
distribution) instead.
                 IgniteDistribution outDistr = distrType == HASH_DISTRIBUTED
                         ? IgniteDistributions.clone(distribution, 
joinInfo.leftKeys)
                         : hash(joinInfo.leftKeys);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
index 9b5c6d599fd..a84f56a3f03 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.rel;
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
 import static 
org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.RAND_UUID;
 import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.broadcast;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.random;
 import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
 import static 
org.apache.ignite.internal.sql.engine.trait.TraitUtils.changeTraits;
 
@@ -232,10 +233,15 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
 
         RelOptCost cost = planner.getCostFactory().makeCost(rowCount, rowCount 
* IgniteCost.ROW_PASS_THROUGH_COST, 0);
 
-        if (distribution() == single()) {
-            // make single distributed projection slightly more expensive to 
help
-            // planner prefer distributed option, if exists
+        // Small adjustment to cost to:
+        //     1) help optimizer to choose distributed plan over single
+        //     2) make result stable when several equivalent options exists,
+        //        but projection convert one of them to random
+        if (distribution() == random()) {
             cost = cost.plus(planner.getCostFactory().makeTinyCost());
+        } else if (distribution() == single()) {
+            // Between single and random the later should be preferable.
+            cost = 
cost.plus(planner.getCostFactory().makeTinyCost().multiplyBy(2));
         }
 
         return cost;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index bbc41b8432e..e7feebf3a63 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -27,7 +27,6 @@ import static 
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import com.google.common.collect.ImmutableList;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntList;
 import java.lang.reflect.Proxy;
@@ -220,21 +219,6 @@ public class TraitUtils {
         return traits.getTrait(DistributionTraitDef.INSTANCE);
     }
 
-    /**
-     * Check distribution definition in traits.
-     *
-     * @param traitDefs Traits to analyze.
-     * @return {@code true} if distribution found, {@code false} otherwise.
-     */
-    public static boolean distributionPresent(ImmutableList<RelTraitDef> 
traitDefs) {
-        for (RelTraitDef<?> trait : traitDefs) {
-            if (trait instanceof DistributionTraitDef) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     /**
      * Collation. TODO Documentation 
https://issues.apache.org/jira/browse/IGNITE-15859
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index f571e186a73..b803673180c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -842,4 +842,15 @@ public final class Commons {
         }
         return true;
     }
+
+    /**
+     * Creates {@link Mappings.TargetMapping} such that for every given source 
within provided sourceSize resulting target equals to source
+     * shifted by specified offset (e.g. {@code target = source + offset}).
+     */
+    public static TargetMapping targetOffsetMapping(int sourceCount, int 
offset) {
+        return Mappings.offsetTarget(
+                Mappings.createIdentity(sourceCount),
+                offset
+        );
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
index 1534e8d4987..ce7775f325d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
@@ -54,7 +54,12 @@ public class IdentityDistributionPlannerTest extends 
AbstractPlannerTest {
 
         assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
                 .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
-                        .and(hasDistribution(IgniteDistributions.identity(0)))
+                        .and(hasDistribution(IgniteDistributions.identity(0))
+                                // This is projection of complexTbl 
distribution on the right side of join.
+                                // That is, for this equi-join, distribution 
might be either equal to one on the left side
+                                // or to its counterpart derived as projection 
of distribution keys of left side through
+                                // join pairs on right side.
+                                
.or(hasDistribution(IgniteDistributions.identity(1))))
                         .and(input(0, isInstanceOf(IgniteIndexScan.class)))
                         .and(input(1, isInstanceOf(IgniteIndexScan.class)))
                 ))
@@ -131,7 +136,12 @@ public class IdentityDistributionPlannerTest extends 
AbstractPlannerTest {
 
         assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
                 .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
-                        .and(hasDistribution(IgniteDistributions.identity(0)))
+                        .and(hasDistribution(IgniteDistributions.identity(0))
+                                // This is projection of complexTbl 
distribution on the right side of join.
+                                // That is, for this equi-join, distribution 
might be either equal to one on the left side
+                                // or to its counterpart derived as projection 
of distribution keys of left side through
+                                // join pairs on right side.
+                                
.or(hasDistribution(IgniteDistributions.identity(1))))
                         .and(input(0, isInstanceOf(IgniteIndexScan.class)))
                         .and(input(1, isInstanceOf(IgniteIndexScan.class)))
                 ))
@@ -154,7 +164,12 @@ public class IdentityDistributionPlannerTest extends 
AbstractPlannerTest {
 
         assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
                 .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
-                        .and(hasDistribution(IgniteDistributions.identity(0)))
+                        .and(hasDistribution(IgniteDistributions.identity(0))
+                                // This is projection of complexTbl 
distribution on the right side of join.
+                                // That is, for this equi-join, distribution 
might be either equal to one on the left side
+                                // or to its counterpart derived as projection 
of distribution keys of left side through
+                                // join pairs on right side.
+                                
.or(hasDistribution(IgniteDistributions.identity(1))))
                         .and(input(0, isInstanceOf(IgniteIndexScan.class)))
                         .and(input(1, isInstanceOf(IgniteTrimExchange.class)
                                 
.and(input(isInstanceOf(IgniteIndexScan.class)))
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
index 1d36e7dcdef..f9f976af8be 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
@@ -28,7 +28,9 @@ import java.util.List;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
 import org.apache.ignite.internal.sql.engine.rel.AbstractIgniteJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
@@ -39,7 +41,10 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -190,8 +195,9 @@ public class JoinColocationPlannerTest extends 
AbstractPlannerTest {
      */
     @Test
     public void joinComplexToSimpleAff() throws Exception {
+        int tableId = nextTableId();
         IgniteTable complexTbl = complexTbl("COMPLEX_TBL", 2 * 
DEFAULT_TBL_SIZE,
-                TestBuilders.affinity(ImmutableIntList.of(0, 1), 
nextTableId(), DEFAULT_ZONE_ID));
+                TestBuilders.affinity(ImmutableIntList.of(0, 1), tableId, 
DEFAULT_ZONE_ID));
 
         IgniteTable simpleTbl = simpleTable("SIMPLE_TBL", DEFAULT_TBL_SIZE);
 
@@ -202,7 +208,12 @@ public class JoinColocationPlannerTest extends 
AbstractPlannerTest {
                 + "join SIMPLE_TBL t2 on t1.id1 = t2.id and t1.id2 = t2.id2";
 
         assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
-                .and(hasDistribution(complexTbl.distribution()))
+                .and(hasDistribution(complexTbl.distribution())
+                        // This is projection of complexTbl distribution on 
the right side of join.
+                        // That is, for this equi-join, distribution might be 
either equal to one on the left side
+                        // or to its counterpart derived as projection of 
distribution keys of left side through
+                        // join pairs on right side.
+                        
.or(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(2, 3), tableId, 
DEFAULT_ZONE_ID))))
                 .and(input(0, isInstanceOf(IgniteIndexScan.class)
                         .and(scan -> 
complexTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
                 ))
@@ -216,6 +227,168 @@ public class JoinColocationPlannerTest extends 
AbstractPlannerTest {
         ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
     }
 
+    // Test cases in this test may emit multiple equivalent plan.
+    // It's not deterministic because for equivalent options the order
+    // sometimes determined by the order of entries in hash-based collection
+    // (like HashMap). Let's repeat this test a few times to cover as much as
+    // possible. This test is relatively fast.
+    @RepeatedTest(10)
+    void joinOutputDistribution() throws Exception {
+        int bigTableId = nextTableId();
+        IgniteTable bigTable = table(
+                "BIG_TBL", 2 * DEFAULT_TBL_SIZE, bigTableId,
+                "ID", "C1", "C2", "C3"
+        );
+        int smallTableId = nextTableId();
+        IgniteTable smallTable = table(
+                "SMALL_TBL", DEFAULT_TBL_SIZE, smallTableId,
+                "ID", "C1", "C2", "C3", "C4"
+        );
+
+        IgniteSchema schema = createSchema(bigTable, smallTable);
+
+        {
+            // Case: Inner join, big table on the left.
+            // Expected output distribution either original from big table, or 
projection
+            // of big table distribution to right side.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM big_tbl bt "
+                    + "JOIN small_tbl st ON bt.id = st.c4";
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    .and(hasDistribution(bigTable.distribution())
+                            
.or(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+                                    4 /* index of C4 column in small_table */
+                                            + 
bigTable.descriptor().rowTypeSansHidden().getFieldCount()
+                            ), bigTableId, DEFAULT_ZONE_ID)))
+                    )
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+
+        {
+            // Case: Inner join, big table on the right.
+            // Expected output distribution either distribution of big table 
shifted
+            // by the size of small table, or projection of big table 
distribution to
+            // left side.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM small_tbl st "
+                    + "JOIN big_tbl bt ON bt.id = st.c4";
+
+            TargetMapping offsetMapping = Commons.targetOffsetMapping(
+                    bigTable.descriptor().rowTypeSansHidden().getFieldCount(),
+                    smallTable.descriptor().rowTypeSansHidden().getFieldCount()
+            );
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    
.and(hasDistribution(bigTable.distribution().apply(offsetMapping))
+                            
.or(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+                                    4 /* index of C4 column in small_table */
+                            ), bigTableId, DEFAULT_ZONE_ID)))
+                    )
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+
+        {
+            // Case: Left join, big table on the left.
+            // Expected output distribution is original from big table only. 
Projection
+            // of big table distribution is not expected since left join may 
emit NULLS on right side.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM big_tbl bt "
+                    + "LEFT JOIN small_tbl st ON bt.id = st.c4";
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    .and(hasDistribution(bigTable.distribution()))
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+
+        {
+            // Case: Left join, big table on the right.
+            // Expected output distribution is projection of big table 
distribution to
+            // left side only. Distribution of big table shifted by the size 
of small
+            // table is not expected since left join may emit NULLS on right 
side.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM small_tbl st "
+                    + "LEFT JOIN big_tbl bt ON bt.id = st.c4";
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    
.and(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+                                    4 /* index of C4 column in small_table */
+                            ), bigTableId, DEFAULT_ZONE_ID))
+                    )
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+
+        {
+            // Case: Right join, big table on the left.
+            // Expected output distribution is projection of big table 
distribution to
+            // right side only. Distribution of big table is not expected 
since right
+            // join may emit NULLS on left side.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM big_tbl bt "
+                    + "RIGHT JOIN small_tbl st ON bt.id = st.c4";
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    
.and(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+                                    4 /* index of C4 column in small_table */
+                                            + 
bigTable.descriptor().rowTypeSansHidden().getFieldCount()
+                            ), bigTableId, DEFAULT_ZONE_ID))
+                    )
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+
+        {
+            // Case: Right join, big table on the right.
+            // Here we expect smaller table to be sent to bigger table.
+            // Hence, expected distribution is distribution of big table 
shifted
+            // by the size of small table only. Projection of big table 
distribution is not
+            // expected since right join may emit NULLS on left side.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM small_tbl st "
+                    + "RIGHT JOIN big_tbl bt ON bt.id = st.c4";
+
+            TargetMapping offsetMapping = Commons.targetOffsetMapping(
+                    bigTable.descriptor().rowTypeSansHidden().getFieldCount(),
+                    smallTable.descriptor().rowTypeSansHidden().getFieldCount()
+            );
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    
.and(hasDistribution(bigTable.distribution().apply(offsetMapping)))
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+
+        {
+            // Case: Outer join, big table on the left.
+            // Expected output distribution is random since outer join may 
emit NULLS on both sides.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM big_tbl bt "
+                    + "FULL OUTER JOIN small_tbl st ON bt.id = st.c4";
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    .and(hasDistribution(IgniteDistributions.random()))
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+
+        {
+            // Case: Outer join, big table on the right.
+            // Expected output distribution is random since outer join may 
emit NULLS on both sides.
+            String sql = "SELECT /*+ enforce_join_order */ "
+                    + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 + 
st.c2 + st.c3 + st.c4) "
+                    + "FROM small_tbl st "
+                    + "FULL OUTER JOIN big_tbl bt ON bt.id = st.c4";
+
+            assertPlan(sql, schema, 
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+                    .and(hasDistribution(IgniteDistributions.random()))
+            ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", 
"HashJoinConverter");
+        }
+    }
+
     /**
      * Re-hashing for complex affinity.
      *
@@ -304,4 +477,21 @@ public class JoinColocationPlannerTest extends 
AbstractPlannerTest {
                 .end()
                 .build();
     }
+
+    private static IgniteTable table(String tableName, int size, int tableId, 
String... columnNames) {
+        TableBuilder builder = TestBuilders.table()
+                .name(tableName)
+                .size(size)
+                .distribution(TestBuilders.affinity(ImmutableIntList.of(0), 
tableId, DEFAULT_ZONE_ID))
+                .sortedIndex()
+                .name("PK")
+                .addColumn(columnNames[0], Collation.ASC_NULLS_LAST)
+                .end();
+
+        for (String name : columnNames) {
+            builder.addColumn(name, NativeTypes.INT32);
+        }
+
+        return builder.build();
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
index 9062e14ab03..c5e742af1e3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.util;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -33,10 +35,13 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.mapping.Mapping;
 import org.apache.calcite.util.mapping.Mappings;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.Mockito;
 
 /**
@@ -140,6 +145,23 @@ public class CommonsTest extends BaseIgniteAbstractTest {
         assertEquals(lt, project4.getRowType(), "Invalid types in projection 
for node4");
     }
 
+    @ParameterizedTest
+    @CsvSource({
+            "3, 3",
+            "5, 5",
+            "2, 7",
+    })
+    void targetOffset(int sourceSize, int offset) {
+        TargetMapping mapping = Commons.targetOffsetMapping(sourceSize, 
offset);
+
+        for (int i = 0; i < sourceSize; i++) {
+            assertThat(
+                    "Source <" + i + "> should be shifted by offset <" + 
offset + ">",
+                    mapping.getTarget(i), is(i + offset)
+            );
+        }
+    }
+
     private static void expectMapped(Mapping mapping, ImmutableBitSet bitSet, 
ImmutableBitSet expected) {
         assertEquals(expected, Mappings.apply(mapping, bitSet), "direct 
mapping");
 
diff --git a/modules/sql-engine/src/test/resources/tpch/plan/q13.plan 
b/modules/sql-engine/src/test/resources/tpch/plan/q13.plan
index bf033bbdb25..c3737692cd5 100644
--- a/modules/sql-engine/src/test/resources/tpch/plan/q13.plan
+++ b/modules/sql-engine/src/test/resources/tpch/plan/q13.plan
@@ -6,34 +6,34 @@ Sort
       group: [C_COUNT]
       aggregation: [COUNT()]
       est: (rows=75000)
-    Project
-        fieldNames: [C_COUNT]
-        projection: [EXPR$1]
+    Exchange
+        distribution: single
         est: (rows=75000)
-      ColocatedHashAggregate
-          fieldNames: [C_CUSTKEY, EXPR$1]
-          group: [C_CUSTKEY]
-          aggregation: [COUNT(O_ORDERKEY)]
+      Project
+          fieldNames: [C_COUNT]
+          projection: [EXPR$1]
           est: (rows=75000)
-        Project
-            fieldNames: [C_CUSTKEY, O_ORDERKEY]
-            projection: [C_CUSTKEY, O_ORDERKEY]
-            est: (rows=375000)
-          HashJoin
-              predicate: =(C_CUSTKEY, O_CUSTKEY)
-              type: right
+        ColocatedHashAggregate
+            fieldNames: [C_CUSTKEY, EXPR$1]
+            group: [C_CUSTKEY]
+            aggregation: [COUNT(O_ORDERKEY)]
+            est: (rows=75000)
+          Project
+              fieldNames: [C_CUSTKEY, O_ORDERKEY]
+              projection: [C_CUSTKEY, O_ORDERKEY]
               est: (rows=375000)
-            Exchange
-                distribution: single
+            HashJoin
+                predicate: =(C_CUSTKEY, O_CUSTKEY)
+                type: right
                 est: (rows=375000)
-              TableScan
-                  table: PUBLIC.ORDERS
-                  predicate: NOT(LIKE(O_COMMENT, _UTF-8'%special%requests%'))
-                  fieldNames: [O_ORDERKEY, O_CUSTKEY, O_COMMENT]
+              Exchange
+                  distribution: table PUBLIC.CUSTOMER in zone "Default" by 
[O_CUSTKEY]
                   est: (rows=375000)
-            Exchange
-                distribution: single
-                est: (rows=150000)
+                TableScan
+                    table: PUBLIC.ORDERS
+                    predicate: NOT(LIKE(O_COMMENT, _UTF-8'%special%requests%'))
+                    fieldNames: [O_ORDERKEY, O_CUSTKEY, O_COMMENT]
+                    est: (rows=375000)
               TableScan
                   table: PUBLIC.CUSTOMER
                   fieldNames: [C_CUSTKEY]


Reply via email to