This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3438c301536 IGNITE-26968 Sql. Left join of 3 tables produces incorrect
result (#6945)
3438c301536 is described below
commit 3438c301536e2a179efe05185ee8ad878cc4e2c8
Author: korlov42 <[email protected]>
AuthorDate: Fri Nov 14 13:11:51 2025 +0200
IGNITE-26968 Sql. Left join of 3 tables produces incorrect result (#6945)
---
.../apache/ignite/jdbc/ItJdbcJoinsSelfTest.java | 5 +
.../sql/engine/rel/AbstractIgniteJoin.java | 95 ++++++----
.../internal/sql/engine/rel/IgniteProject.java | 12 +-
.../internal/sql/engine/trait/TraitUtils.java | 16 --
.../ignite/internal/sql/engine/util/Commons.java | 11 ++
.../planner/IdentityDistributionPlannerTest.java | 21 ++-
.../engine/planner/JoinColocationPlannerTest.java | 194 ++++++++++++++++++++-
.../internal/sql/engine/util/CommonsTest.java | 22 +++
.../src/test/resources/tpch/plan/q13.plan | 46 ++---
9 files changed, 345 insertions(+), 77 deletions(-)
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
index 5ae13251f63..ee9d5366b68 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcJoinsSelfTest.java
@@ -29,6 +29,11 @@ import org.junit.jupiter.api.Test;
* Tests for complex queries with joins.
*/
public class ItJdbcJoinsSelfTest extends AbstractJdbcSelfTest {
+ @Override
+ protected int initialNodes() {
+ return 3;
+ }
+
/**
* Check distributed OUTER join of 3 tables (T1 -> T2 -> T3) returns
correct result for non-collocated data.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
index 7f672db4d50..c3a0485d133 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/AbstractIgniteJoin.java
@@ -50,8 +50,8 @@ import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.mapping.IntPair;
import org.apache.calcite.util.mapping.Mappings;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
import org.apache.ignite.internal.sql.engine.rel.explain.IgniteRelWriter;
-import org.apache.ignite.internal.sql.engine.trait.DistributionFunction;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -117,30 +117,22 @@ public abstract class AbstractIgniteJoin extends Join
implements TraitsAwareIgni
/** {@inheritDoc} */
@Override
public List<Pair<RelTraitSet, List<RelTraitSet>>>
deriveDistribution(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
- // There are several rules:
- // 1) any join is possible on broadcast or single distribution
- // 2) hash distributed join is possible when join keys are superset of
source distribution keys
- // 3) hash and broadcast distributed tables can be joined when join
keys equal to hash
- // distributed table distribution keys and:
- // 3.1) it's a left join and a hash distributed table is at left
- // 3.2) it's a right join and a hash distributed table is at right
- // 3.3) it's an inner join, this case a hash distributed table
may be at any side
-
RelTraitSet left = inputTraits.get(0);
RelTraitSet right = inputTraits.get(1);
List<Pair<RelTraitSet, List<RelTraitSet>>> res = new ArrayList<>();
- final IgniteDistribution leftDistr = TraitUtils.distribution(left);
- final IgniteDistribution rightDistr = TraitUtils.distribution(right);
+ IgniteDistribution leftDistr = TraitUtils.distribution(left);
+ IgniteDistribution rightDistr = TraitUtils.distribution(right);
- final IgniteDistribution left2rightProjectedDistr =
leftDistr.apply(buildTransposeMapping(true));
- final IgniteDistribution right2leftProjectedDistr =
rightDistr.apply(buildTransposeMapping(false));
+ IgniteDistribution left2rightProjectedDistr =
leftDistr.apply(buildTransposeMapping(true));
+ IgniteDistribution right2leftProjectedDistr =
rightDistr.apply(buildTransposeMapping(false));
RelTraitSet outTraits;
RelTraitSet leftTraits;
RelTraitSet rightTraits;
+ // Any join is possible with single and broadcast distributions.
if (leftDistr == broadcast() && rightDistr == broadcast()) {
outTraits = nodeTraits.replace(broadcast());
leftTraits = left.replace(broadcast());
@@ -157,32 +149,75 @@ public abstract class AbstractIgniteJoin extends Join
implements TraitsAwareIgni
return List.copyOf(res);
}
- if (leftDistr.getType() == HASH_DISTRIBUTED &&
left2rightProjectedDistr != random()) {
- outTraits = nodeTraits.replace(leftDistr);
- leftTraits = left.replace(leftDistr);
- rightTraits = right.replace(left2rightProjectedDistr);
+ TargetMapping offsetByLeftSize = Commons.targetOffsetMapping(
+ this.right.getRowType().getFieldCount(),
+ this.left.getRowType().getFieldCount()
+ );
- res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+ // Re-hashing of right side to left.
+ if (leftDistr.getType() == HASH_DISTRIBUTED &&
left2rightProjectedDistr != random()) {
+ computeHashOutputOptions(nodeTraits, left, leftDistr, right,
left2rightProjectedDistr, res, offsetByLeftSize);
}
+ // Re-hashing of left side to right.
if (rightDistr.getType() == HASH_DISTRIBUTED &&
right2leftProjectedDistr != random()) {
- outTraits = nodeTraits.replace(rightDistr);
- leftTraits = left.replace(right2leftProjectedDistr);
- rightTraits = right.replace(rightDistr);
+ computeHashOutputOptions(nodeTraits, left,
right2leftProjectedDistr, right, rightDistr, res, offsetByLeftSize);
+ }
+ // Full re-hashing by join keys.
+ computeHashOutputOptions(
+ nodeTraits, left, hash(joinInfo.leftKeys), right,
hash(joinInfo.rightKeys), res, offsetByLeftSize
+ );
+
+ return List.copyOf(res);
+ }
+
+ private void computeHashOutputOptions(
+ RelTraitSet nodeCurrent,
+ RelTraitSet leftCurrent,
+ IgniteDistribution newLeftDistribution,
+ RelTraitSet rightCurrent,
+ IgniteDistribution newRightDistribution,
+ List<Pair<RelTraitSet, List<RelTraitSet>>> res,
+ TargetMapping offsetByLeftSize
+ ) {
+ if (newLeftDistribution.getType() != HASH_DISTRIBUTED
+ || newRightDistribution.getType() != HASH_DISTRIBUTED) {
+ // Throw this error explicitly, otherwise we might get incorrect
plan which
+ // emits wrong result.
+ throw new AssertionError("Only hash-based distribution is
allowed");
+ }
+
+ RelTraitSet leftTraits = leftCurrent.replace(newLeftDistribution);
+ RelTraitSet rightTraits = rightCurrent.replace(newRightDistribution);
+
+ RelTraitSet outTraits;
+
+ // Depending on the type of the join, we may produce output
distribution based
+ // on left keys, right keys, or both.
+ if (shouldEmitLeftSideDistribution(joinType)) {
+ outTraits = nodeCurrent.replace(newLeftDistribution);
res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
}
- leftTraits = left.replace(hash(joinInfo.leftKeys,
DistributionFunction.hash()));
- rightTraits = right.replace(hash(joinInfo.rightKeys,
DistributionFunction.hash()));
+ if (shouldEmitRightSideDistribution(joinType)) {
+ outTraits =
nodeCurrent.replace(newRightDistribution.apply(offsetByLeftSize));
+ res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+ }
- outTraits = nodeTraits.replace(hash(joinInfo.leftKeys,
DistributionFunction.hash()));
- res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+ // If we can emit anything meaningful, then emit output with random
distribution.
+ if (!shouldEmitLeftSideDistribution(joinType) &&
!shouldEmitRightSideDistribution(joinType)) {
+ outTraits = nodeCurrent.replace(random());
+ res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+ }
+ }
- outTraits = nodeTraits.replace(hash(joinInfo.rightKeys,
DistributionFunction.hash()));
- res.add(Pair.of(outTraits, List.of(leftTraits, rightTraits)));
+ private static boolean shouldEmitLeftSideDistribution(JoinRelType
joinType) {
+ return !joinType.generatesNullsOnLeft();
+ }
- return List.copyOf(res);
+ private static boolean shouldEmitRightSideDistribution(JoinRelType
joinType) {
+ return joinType.projectsRight() && !joinType.generatesNullsOnRight();
}
/** {@inheritDoc} */
@@ -252,7 +287,7 @@ public abstract class AbstractIgniteJoin extends Join
implements TraitsAwareIgni
}
// We cannot provide random distribution without unique
constraint on join keys,
- // so, we require hash distribution (wich satisfies random
distribution) instead.
+ // so, we require hash distribution (which satisfies random
distribution) instead.
IgniteDistribution outDistr = distrType == HASH_DISTRIBUTED
? IgniteDistributions.clone(distribution,
joinInfo.leftKeys)
: hash(joinInfo.leftKeys);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
index 9b5c6d599fd..a84f56a3f03 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteProject.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.rel;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
import static
org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.RAND_UUID;
import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.broadcast;
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.random;
import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import static
org.apache.ignite.internal.sql.engine.trait.TraitUtils.changeTraits;
@@ -232,10 +233,15 @@ public class IgniteProject extends Project implements
TraitsAwareIgniteRel {
RelOptCost cost = planner.getCostFactory().makeCost(rowCount, rowCount
* IgniteCost.ROW_PASS_THROUGH_COST, 0);
- if (distribution() == single()) {
- // make single distributed projection slightly more expensive to
help
- // planner prefer distributed option, if exists
+ // Small adjustment to cost to:
+ // 1) help optimizer to choose distributed plan over single
+ // 2) make result stable when several equivalent options exists,
+ // but projection convert one of them to random
+ if (distribution() == random()) {
cost = cost.plus(planner.getCostFactory().makeTinyCost());
+ } else if (distribution() == single()) {
+ // Between single and random the later should be preferable.
+ cost =
cost.plus(planner.getCostFactory().makeTinyCost().multiplyBy(2));
}
return cost;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index bbc41b8432e..e7feebf3a63 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -27,7 +27,6 @@ import static
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import com.google.common.collect.ImmutableList;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntList;
import java.lang.reflect.Proxy;
@@ -220,21 +219,6 @@ public class TraitUtils {
return traits.getTrait(DistributionTraitDef.INSTANCE);
}
- /**
- * Check distribution definition in traits.
- *
- * @param traitDefs Traits to analyze.
- * @return {@code true} if distribution found, {@code false} otherwise.
- */
- public static boolean distributionPresent(ImmutableList<RelTraitDef>
traitDefs) {
- for (RelTraitDef<?> trait : traitDefs) {
- if (trait instanceof DistributionTraitDef) {
- return true;
- }
- }
- return false;
- }
-
/**
* Collation. TODO Documentation
https://issues.apache.org/jira/browse/IGNITE-15859
*/
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index f571e186a73..b803673180c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -842,4 +842,15 @@ public final class Commons {
}
return true;
}
+
+ /**
+ * Creates {@link Mappings.TargetMapping} such that for every given source
within provided sourceSize resulting target equals to source
+ * shifted by specified offset (e.g. {@code target = source + offset}).
+ */
+ public static TargetMapping targetOffsetMapping(int sourceCount, int
offset) {
+ return Mappings.offsetTarget(
+ Mappings.createIdentity(sourceCount),
+ offset
+ );
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
index 1534e8d4987..ce7775f325d 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/IdentityDistributionPlannerTest.java
@@ -54,7 +54,12 @@ public class IdentityDistributionPlannerTest extends
AbstractPlannerTest {
assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
.and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
- .and(hasDistribution(IgniteDistributions.identity(0)))
+ .and(hasDistribution(IgniteDistributions.identity(0))
+ // This is projection of complexTbl
distribution on the right side of join.
+ // That is, for this equi-join, distribution
might be either equal to one on the left side
+ // or to its counterpart derived as projection
of distribution keys of left side through
+ // join pairs on right side.
+
.or(hasDistribution(IgniteDistributions.identity(1))))
.and(input(0, isInstanceOf(IgniteIndexScan.class)))
.and(input(1, isInstanceOf(IgniteIndexScan.class)))
))
@@ -131,7 +136,12 @@ public class IdentityDistributionPlannerTest extends
AbstractPlannerTest {
assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
.and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
- .and(hasDistribution(IgniteDistributions.identity(0)))
+ .and(hasDistribution(IgniteDistributions.identity(0))
+ // This is projection of complexTbl
distribution on the right side of join.
+ // That is, for this equi-join, distribution
might be either equal to one on the left side
+ // or to its counterpart derived as projection
of distribution keys of left side through
+ // join pairs on right side.
+
.or(hasDistribution(IgniteDistributions.identity(1))))
.and(input(0, isInstanceOf(IgniteIndexScan.class)))
.and(input(1, isInstanceOf(IgniteIndexScan.class)))
))
@@ -154,7 +164,12 @@ public class IdentityDistributionPlannerTest extends
AbstractPlannerTest {
assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
.and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
- .and(hasDistribution(IgniteDistributions.identity(0)))
+ .and(hasDistribution(IgniteDistributions.identity(0))
+ // This is projection of complexTbl
distribution on the right side of join.
+ // That is, for this equi-join, distribution
might be either equal to one on the left side
+ // or to its counterpart derived as projection
of distribution keys of left side through
+ // join pairs on right side.
+
.or(hasDistribution(IgniteDistributions.identity(1))))
.and(input(0, isInstanceOf(IgniteIndexScan.class)))
.and(input(1, isInstanceOf(IgniteTrimExchange.class)
.and(input(isInstanceOf(IgniteIndexScan.class)))
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
index 1d36e7dcdef..f9f976af8be 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
@@ -28,7 +28,9 @@ import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
import org.apache.ignite.internal.sql.engine.rel.AbstractIgniteJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
@@ -39,7 +41,10 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -190,8 +195,9 @@ public class JoinColocationPlannerTest extends
AbstractPlannerTest {
*/
@Test
public void joinComplexToSimpleAff() throws Exception {
+ int tableId = nextTableId();
IgniteTable complexTbl = complexTbl("COMPLEX_TBL", 2 *
DEFAULT_TBL_SIZE,
- TestBuilders.affinity(ImmutableIntList.of(0, 1),
nextTableId(), DEFAULT_ZONE_ID));
+ TestBuilders.affinity(ImmutableIntList.of(0, 1), tableId,
DEFAULT_ZONE_ID));
IgniteTable simpleTbl = simpleTable("SIMPLE_TBL", DEFAULT_TBL_SIZE);
@@ -202,7 +208,12 @@ public class JoinColocationPlannerTest extends
AbstractPlannerTest {
+ "join SIMPLE_TBL t2 on t1.id1 = t2.id and t1.id2 = t2.id2";
assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
- .and(hasDistribution(complexTbl.distribution()))
+ .and(hasDistribution(complexTbl.distribution())
+ // This is projection of complexTbl distribution on
the right side of join.
+ // That is, for this equi-join, distribution might be
either equal to one on the left side
+ // or to its counterpart derived as projection of
distribution keys of left side through
+ // join pairs on right side.
+
.or(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(2, 3), tableId,
DEFAULT_ZONE_ID))))
.and(input(0, isInstanceOf(IgniteIndexScan.class)
.and(scan ->
complexTbl.equals(scan.getTable().unwrap(IgniteTable.class)))
))
@@ -216,6 +227,168 @@ public class JoinColocationPlannerTest extends
AbstractPlannerTest {
), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
}
+ // Test cases in this test may emit multiple equivalent plan.
+ // It's not deterministic because for equivalent options the order
+ // sometimes determined by the order of entries in hash-based collection
+ // (like HashMap). Let's repeat this test a few times to cover as much as
+ // possible. This test is relatively fast.
+ @RepeatedTest(10)
+ void joinOutputDistribution() throws Exception {
+ int bigTableId = nextTableId();
+ IgniteTable bigTable = table(
+ "BIG_TBL", 2 * DEFAULT_TBL_SIZE, bigTableId,
+ "ID", "C1", "C2", "C3"
+ );
+ int smallTableId = nextTableId();
+ IgniteTable smallTable = table(
+ "SMALL_TBL", DEFAULT_TBL_SIZE, smallTableId,
+ "ID", "C1", "C2", "C3", "C4"
+ );
+
+ IgniteSchema schema = createSchema(bigTable, smallTable);
+
+ {
+ // Case: Inner join, big table on the left.
+ // Expected output distribution either original from big table, or
projection
+ // of big table distribution to right side.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM big_tbl bt "
+ + "JOIN small_tbl st ON bt.id = st.c4";
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+ .and(hasDistribution(bigTable.distribution())
+
.or(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+ 4 /* index of C4 column in small_table */
+ +
bigTable.descriptor().rowTypeSansHidden().getFieldCount()
+ ), bigTableId, DEFAULT_ZONE_ID)))
+ )
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+
+ {
+ // Case: Inner join, big table on the right.
+ // Expected output distribution either distribution of big table
shifted
+ // by the size of small table, or projection of big table
distribution to
+ // left side.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM small_tbl st "
+ + "JOIN big_tbl bt ON bt.id = st.c4";
+
+ TargetMapping offsetMapping = Commons.targetOffsetMapping(
+ bigTable.descriptor().rowTypeSansHidden().getFieldCount(),
+ smallTable.descriptor().rowTypeSansHidden().getFieldCount()
+ );
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+
.and(hasDistribution(bigTable.distribution().apply(offsetMapping))
+
.or(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+ 4 /* index of C4 column in small_table */
+ ), bigTableId, DEFAULT_ZONE_ID)))
+ )
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+
+ {
+ // Case: Left join, big table on the left.
+ // Expected output distribution is original from big table only.
Projection
+ // of big table distribution is not expected since left join may
emit NULLS on right side.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM big_tbl bt "
+ + "LEFT JOIN small_tbl st ON bt.id = st.c4";
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+ .and(hasDistribution(bigTable.distribution()))
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+
+ {
+ // Case: Left join, big table on the right.
+ // Expected output distribution is projection of big table
distribution to
+ // left side only. Distribution of big table shifted by the size
of small
+ // table is not expected since left join may emit NULLS on right
side.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM small_tbl st "
+ + "LEFT JOIN big_tbl bt ON bt.id = st.c4";
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+
.and(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+ 4 /* index of C4 column in small_table */
+ ), bigTableId, DEFAULT_ZONE_ID))
+ )
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+
+ {
+ // Case: Right join, big table on the left.
+ // Expected output distribution is projection of big table
distribution to
+ // right side only. Distribution of big table is not expected
since right
+ // join may emit NULLS on left side.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM big_tbl bt "
+ + "RIGHT JOIN small_tbl st ON bt.id = st.c4";
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+
.and(hasDistribution(TestBuilders.affinity(ImmutableIntList.of(
+ 4 /* index of C4 column in small_table */
+ +
bigTable.descriptor().rowTypeSansHidden().getFieldCount()
+ ), bigTableId, DEFAULT_ZONE_ID))
+ )
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+
+ {
+ // Case: Right join, big table on the right.
+ // Here we expect smaller table to be sent to bigger table.
+ // Hence, expected distribution is distribution of big table
shifted
+ // by the size of small table only. Projection of big table
distribution is not
+ // expected since right join may emit NULLS on left side.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM small_tbl st "
+ + "RIGHT JOIN big_tbl bt ON bt.id = st.c4";
+
+ TargetMapping offsetMapping = Commons.targetOffsetMapping(
+ bigTable.descriptor().rowTypeSansHidden().getFieldCount(),
+ smallTable.descriptor().rowTypeSansHidden().getFieldCount()
+ );
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+
.and(hasDistribution(bigTable.distribution().apply(offsetMapping)))
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+
+ {
+ // Case: Outer join, big table on the left.
+ // Expected output distribution is random since outer join may
emit NULLS on both sides.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM big_tbl bt "
+ + "FULL OUTER JOIN small_tbl st ON bt.id = st.c4";
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+ .and(hasDistribution(IgniteDistributions.random()))
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+
+ {
+ // Case: Outer join, big table on the right.
+ // Expected output distribution is random since outer join may
emit NULLS on both sides.
+ String sql = "SELECT /*+ enforce_join_order */ "
+ + " COUNT(bt.id + bt.c1 + bt.c2 + bt.c3 + st.id + st.c1 +
st.c2 + st.c3 + st.c4) "
+ + "FROM small_tbl st "
+ + "FULL OUTER JOIN big_tbl bt ON bt.id = st.c4";
+
+ assertPlan(sql, schema,
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class)
+ .and(hasDistribution(IgniteDistributions.random()))
+ ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin",
"HashJoinConverter");
+ }
+ }
+
/**
* Re-hashing for complex affinity.
*
@@ -304,4 +477,21 @@ public class JoinColocationPlannerTest extends
AbstractPlannerTest {
.end()
.build();
}
+
+ private static IgniteTable table(String tableName, int size, int tableId,
String... columnNames) {
+ TableBuilder builder = TestBuilders.table()
+ .name(tableName)
+ .size(size)
+ .distribution(TestBuilders.affinity(ImmutableIntList.of(0),
tableId, DEFAULT_ZONE_ID))
+ .sortedIndex()
+ .name("PK")
+ .addColumn(columnNames[0], Collation.ASC_NULLS_LAST)
+ .end();
+
+ for (String name : columnNames) {
+ builder.addColumn(name, NativeTypes.INT32);
+ }
+
+ return builder.build();
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
index 9062e14ab03..c5e742af1e3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine.util;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -33,10 +35,13 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.Mappings;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;
/**
@@ -140,6 +145,23 @@ public class CommonsTest extends BaseIgniteAbstractTest {
assertEquals(lt, project4.getRowType(), "Invalid types in projection
for node4");
}
+ @ParameterizedTest
+ @CsvSource({
+ "3, 3",
+ "5, 5",
+ "2, 7",
+ })
+ void targetOffset(int sourceSize, int offset) {
+ TargetMapping mapping = Commons.targetOffsetMapping(sourceSize,
offset);
+
+ for (int i = 0; i < sourceSize; i++) {
+ assertThat(
+ "Source <" + i + "> should be shifted by offset <" +
offset + ">",
+ mapping.getTarget(i), is(i + offset)
+ );
+ }
+ }
+
private static void expectMapped(Mapping mapping, ImmutableBitSet bitSet,
ImmutableBitSet expected) {
assertEquals(expected, Mappings.apply(mapping, bitSet), "direct
mapping");
diff --git a/modules/sql-engine/src/test/resources/tpch/plan/q13.plan
b/modules/sql-engine/src/test/resources/tpch/plan/q13.plan
index bf033bbdb25..c3737692cd5 100644
--- a/modules/sql-engine/src/test/resources/tpch/plan/q13.plan
+++ b/modules/sql-engine/src/test/resources/tpch/plan/q13.plan
@@ -6,34 +6,34 @@ Sort
group: [C_COUNT]
aggregation: [COUNT()]
est: (rows=75000)
- Project
- fieldNames: [C_COUNT]
- projection: [EXPR$1]
+ Exchange
+ distribution: single
est: (rows=75000)
- ColocatedHashAggregate
- fieldNames: [C_CUSTKEY, EXPR$1]
- group: [C_CUSTKEY]
- aggregation: [COUNT(O_ORDERKEY)]
+ Project
+ fieldNames: [C_COUNT]
+ projection: [EXPR$1]
est: (rows=75000)
- Project
- fieldNames: [C_CUSTKEY, O_ORDERKEY]
- projection: [C_CUSTKEY, O_ORDERKEY]
- est: (rows=375000)
- HashJoin
- predicate: =(C_CUSTKEY, O_CUSTKEY)
- type: right
+ ColocatedHashAggregate
+ fieldNames: [C_CUSTKEY, EXPR$1]
+ group: [C_CUSTKEY]
+ aggregation: [COUNT(O_ORDERKEY)]
+ est: (rows=75000)
+ Project
+ fieldNames: [C_CUSTKEY, O_ORDERKEY]
+ projection: [C_CUSTKEY, O_ORDERKEY]
est: (rows=375000)
- Exchange
- distribution: single
+ HashJoin
+ predicate: =(C_CUSTKEY, O_CUSTKEY)
+ type: right
est: (rows=375000)
- TableScan
- table: PUBLIC.ORDERS
- predicate: NOT(LIKE(O_COMMENT, _UTF-8'%special%requests%'))
- fieldNames: [O_ORDERKEY, O_CUSTKEY, O_COMMENT]
+ Exchange
+ distribution: table PUBLIC.CUSTOMER in zone "Default" by
[O_CUSTKEY]
est: (rows=375000)
- Exchange
- distribution: single
- est: (rows=150000)
+ TableScan
+ table: PUBLIC.ORDERS
+ predicate: NOT(LIKE(O_COMMENT, _UTF-8'%special%requests%'))
+ fieldNames: [O_ORDERKEY, O_CUSTKEY, O_COMMENT]
+ est: (rows=375000)
TableScan
table: PUBLIC.CUSTOMER
fieldNames: [C_CUSTKEY]