This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 525dfdb  pending
525dfdb is described below

commit 525dfdbd9e2ec1a3b250f22c5b420e8e0ef27a9b
Author: Igor Seliverstov <gvvinbl...@gmail.com>
AuthorDate: Mon Nov 11 20:08:56 2019 +0300

    pending
---
 .../calcite/metadata/IgniteMdDistribution.java     |  10 +-
 .../query/calcite/rule/IgniteFilterRule.java       |   2 +-
 .../query/calcite/rule/IgniteHashJoinRule.java     |  81 ++++++++++-
 .../query/calcite/rule/IgniteProjectRule.java      |   2 +-
 .../processors/query/calcite/rule/IgniteRules.java |   4 +-
 .../query/calcite/trait/IgniteDistributions.java   |  21 +++
 .../processors/query/calcite/util/Commons.java     |  13 ++
 .../query/calcite/CalciteQueryProcessorTest.java   | 160 ++++++---------------
 8 files changed, 164 insertions(+), 129 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index b2f91a9..d58902a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -37,11 +37,13 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexSlot;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST;
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
 
 /**
@@ -76,6 +78,10 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
         return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
+    public DistributionTrait getDistributionTrait(IgniteTableScan rel, 
RelMetadataQuery mq) {
+        return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+    }
+
     public static DistributionTrait project(RelMetadataQuery mq, RelNode 
input, List<RexNode> projects) {
         DistributionTrait trait = distribution(input, mq);
 
@@ -120,7 +126,9 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
     }
 
     public static DistributionTrait join(RelMetadataQuery mq, RelNode left, 
RelNode right, RexNode condition) {
-        return distribution(left, mq);
+        DistributionTrait leftDist = distribution(left, mq);
+
+        return leftDist.type() != BROADCAST ? leftDist : distribution(right, 
mq);
     }
 
     public static DistributionTrait distribution(RelNode rel, RelMetadataQuery 
mq) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
index 9a7606c..2ec33a06 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
@@ -49,6 +49,6 @@ public class IgniteFilterRule extends RelOptRule {
         RelOp<LogicalFilter, Boolean> transformOp = 
Commons.transformSubset(call, converted, IgniteFilter::create);
 
         if (!transformOp.go(filter))
-            call.transformTo(IgniteFilter.create(filter, converted));
+            call.transformTo(LogicalFilter.create(converted, 
filter.getCondition()));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java
index cc17c24..5bb537f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rule;
 
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Objects;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
@@ -27,10 +30,15 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.SINGLE;
+
 /**
  *
  */
@@ -38,25 +46,59 @@ public class IgniteHashJoinRule extends RelOptRule {
     public static final RelOptRule INSTANCE = new IgniteHashJoinRule();
 
     public IgniteHashJoinRule() {
-        super(Commons.any(LogicalJoin.class, RelNode.class), 
RelFactories.LOGICAL_BUILDER, "IgniteJoinRule");
+        super(Commons.any(LogicalJoin.class, RelNode.class), 
RelFactories.LOGICAL_BUILDER, "IgniteHashJoinRule");
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
         LogicalJoin join = call.rel(0);
 
         RelTraitSet leftTraits = join.getLeft().getTraitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION)
-            
.replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys, 
IgniteDistributions.noOpFunction()));
+            .replace(IgniteRel.IGNITE_CONVENTION);
 
         RelTraitSet rightTraits = join.getRight().getTraitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION)
-            
.replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys, 
IgniteDistributions.noOpFunction()));
+            .replace(IgniteRel.IGNITE_CONVENTION);
 
         RelNode left = convert(join.getLeft(), leftTraits);
         RelNode right = convert(join.getRight(), rightTraits);
 
         RelMetadataQuery mq = call.getMetadataQuery();
 
+        List<DistributionTrait> leftDerived;
+        List<DistributionTrait> rightDerived;
+
+        if ((leftDerived = IgniteDistributions.deriveDistributions(left, 
mq)).isEmpty()
+            || (rightDerived = IgniteDistributions.deriveDistributions(right, 
mq)).isEmpty()) {
+            call.transformTo(join.copy(join.getTraitSet(), 
ImmutableList.of(left, right)));
+
+            return;
+        }
+
+        List<DistributionTrait> leftDists = Commons.concat(leftDerived,
+            IgniteDistributions.hash(join.analyzeCondition().leftKeys, 
IgniteDistributions.hashFunction()));
+
+        List<DistributionTrait> rightDists = Commons.concat(rightDerived,
+            IgniteDistributions.hash(join.analyzeCondition().rightKeys, 
IgniteDistributions.hashFunction()));
+
+        for (DistributionTrait leftDist0 : leftDists) {
+            for (DistributionTrait rightDist0 : rightDists) {
+                if (canTransform(join, leftDist0, rightDist0))
+                    transform(call, join, mq, leftDist0, rightDist0);
+            }
+        }
+    }
+
+    private void transform(RelOptRuleCall call, LogicalJoin join, 
RelMetadataQuery mq, DistributionTrait leftDist, DistributionTrait rightDist) {
+        RelTraitSet leftTraits = join.getLeft().getTraitSet()
+            .replace(IgniteRel.IGNITE_CONVENTION)
+            .replace(leftDist);
+
+        RelTraitSet rightTraits = join.getRight().getTraitSet()
+            .replace(IgniteRel.IGNITE_CONVENTION)
+            .replace(rightDist);
+
+        RelNode left = convert(join.getLeft(), leftTraits);
+        RelNode right = convert(join.getRight(), rightTraits);
+
         RelTraitSet traitSet = join.getTraitSet()
             .replace(IgniteRel.IGNITE_CONVENTION)
             .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.join(mq, left, right, join.getCondition()));
@@ -64,4 +106,33 @@ public class IgniteHashJoinRule extends RelOptRule {
         call.transformTo(new IgniteHashJoin(join.getCluster(), traitSet, left, 
right,
             join.getCondition(), join.getVariablesSet(), join.getJoinType(), 
join.isSemiJoinDone()));
     }
+
+    private boolean canTransform(LogicalJoin join, DistributionTrait leftDist, 
DistributionTrait rightDist) {
+        if (leftDist.type() == BROADCAST
+            && rightDist.type() == BROADCAST)
+            return true;
+
+        if (rightDist.type() == SINGLE
+            && leftDist.type() == SINGLE)
+            return true;
+
+        if (leftDist.type() == BROADCAST
+            && rightDist.type() == HASH
+            && Objects.equals(rightDist.keys(), 
join.analyzeCondition().rightKeys))
+            return true;
+
+        if (rightDist.type() == BROADCAST
+            && leftDist.type() == HASH
+            && Objects.equals(leftDist.keys(), 
join.analyzeCondition().leftKeys))
+            return true;
+
+        if (leftDist.type() == HASH
+            && rightDist.type() == HASH
+            && Objects.equals(leftDist.keys(), 
join.analyzeCondition().leftKeys)
+            && Objects.equals(rightDist.keys(), 
join.analyzeCondition().rightKeys)
+            && Objects.equals(rightDist.destinationFunctionFactory(), 
leftDist.destinationFunctionFactory()))
+            return true;
+
+        return false;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
index 0ea3a40..4c2993e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
@@ -49,6 +49,6 @@ public class IgniteProjectRule extends RelOptRule {
         RelOp<LogicalProject, Boolean> transformOp = 
Commons.transformSubset(call, converted, IgniteProject::create);
 
         if (!transformOp.go(project))
-            call.transformTo(IgniteProject.create(project, converted));
+            call.transformTo(LogicalProject.create(converted, 
project.getProjects(), project.getRowType()));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
index 53ce878..82f4960 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
@@ -155,14 +155,14 @@ public class IgniteRules {
         SubQueryRemoveRule.PROJECT,
         SubQueryRemoveRule.JOIN);
 
-    public static final List<RelOptRule> IGNITE_LOGICAL_RULES = 
ImmutableList.of(
+    public static final List<RelOptRule> IGNITE_RULES = ImmutableList.of(
         IgniteFilterRule.INSTANCE,
         IgniteProjectRule.INSTANCE,
         IgniteHashJoinRule.INSTANCE);
 
     public static List<RelOptRule> logicalRules(Context ctx) {
         return ImmutableList.<RelOptRule>builder()
-            .addAll(IGNITE_LOGICAL_RULES)
+            .addAll(IGNITE_RULES)
             .add(AbstractConverter.ExpandConversionRule.INSTANCE)
             .build();
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 8f83227..57ccaf8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -16,12 +16,18 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.ToIntFunction;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
 
@@ -107,4 +113,19 @@ public class IgniteDistributions {
     public static DestinationFunctionFactory hashFunction() {
         return HASH_FACTORY;
     }
+
+    public static List<DistributionTrait> deriveDistributions(RelNode rel, 
RelMetadataQuery mq) {
+        if (!(rel instanceof RelSubset)) {
+            DistributionTrait dist = IgniteMdDistribution.distribution(rel, 
mq);
+
+            return dist.type() == DistributionType.ANY ? 
Collections.emptyList() : Collections.singletonList(dist);
+        }
+
+        HashSet<DistributionTrait> res = new HashSet<>();
+
+        for (RelNode relNode : ((RelSubset) rel).getRels())
+            res.addAll(deriveDistributions(relNode, mq));
+
+        return res.isEmpty() ? Collections.emptyList() : new ArrayList<>(res);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index ea3ecc3..671bc94 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -222,4 +222,17 @@ public final class Commons {
     private static int[] ensureSize(int[] array, int size) {
         return size < array.length ? array : Arrays.copyOf(array, 
U.ceilPow2(size));
     }
+
+    public static <T> List<T> concat(List<T> col, T... elements) {
+        ArrayList<T> res = new ArrayList<>(col.size() + elements.length);
+
+        res.addAll(col);
+        Collections.addAll(res, elements);
+
+        return res;
+    }
+
+//    public String explainToString(RelNode node) {
+//        RelWriterImpl
+//    }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index e6dc5c7..c5140c1 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -55,7 +55,6 @@ import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -263,119 +262,6 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     @Test
-    public void testVolcanoPlanerLocal() throws Exception {
-        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
-            "FROM PUBLIC.Developer d JOIN (" +
-            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
-            ") p " +
-            "ON d.projectId = p.id0 " +
-            "WHERE (d.projectId + 1) > ?";
-
-        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
-
-        assertNotNull(ctx);
-
-        RelTraitDef[] traitDefs = {
-            ConventionTraitDef.INSTANCE
-        };
-
-        RelRoot relRoot;
-
-        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
-            assertNotNull(planner);
-
-            Query query = ctx.unwrap(Query.class);
-
-            assertNotNull(planner);
-
-            // Parse
-            SqlNode sqlNode = planner.parse(query.sql());
-
-            // Validate
-            sqlNode = planner.validate(sqlNode);
-
-            // Convert to Relational operators graph
-            relRoot = planner.rel(sqlNode);
-
-            RelNode rel = relRoot.rel;
-
-            // Transformation chain
-
-            RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
-
-            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
-        }
-
-        assertNotNull(relRoot.rel);
-    }
-
-    @Test
-    public void testSplitterLocal() throws Exception {
-        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
-            "FROM PUBLIC.Developer d JOIN (" +
-            "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
-            ") p " +
-            "ON d.projectId = p.id0 " +
-            "WHERE (d.projectId + 1) > ?";
-
-        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
-
-        assertNotNull(ctx);
-
-        RelTraitDef[] traitDefs = {
-            ConventionTraitDef.INSTANCE
-        };
-
-        RelRoot relRoot;
-
-        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
-            assertNotNull(planner);
-
-            Query query = ctx.unwrap(Query.class);
-
-            assertNotNull(planner);
-
-            // Parse
-            SqlNode sqlNode = planner.parse(query.sql());
-
-            // Validate
-            sqlNode = planner.validate(sqlNode);
-
-            // Convert to Relational operators graph
-            relRoot = planner.rel(sqlNode);
-
-            RelNode rel = relRoot.rel;
-
-            // Transformation chain
-            rel = planner.transform(PlannerType.HEP, 
PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
-
-            RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteRel.IGNITE_CONVENTION)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);
-
-            relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
-        }
-
-        assertNotNull(relRoot);
-
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
-
-        assertNotNull(plan);
-
-        plan.init(ctx);
-
-        assertNotNull(plan);
-    }
-
-    @Test
     public void testSplitterCollocatedPartitionedPartitioned() throws 
Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
@@ -440,7 +326,6 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     @Test
-    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
     public void testSplitterCollocatedReplicatedReplicated() throws Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
@@ -521,7 +406,6 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     @Test
-    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
     public void testSplitterCollocatedReplicatedAndPartitioned() throws 
Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
@@ -618,6 +502,30 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
+        TestRegistry registry = new TestRegistry(){
+            @Override public DistributionTrait distribution(int cacheId, 
RowType rowType) {
+                if (cacheId == CU.cacheId("Project"))
+                    return IgniteDistributions.broadcast();
+
+                return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
+            }
+
+            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return new Location(null, Arrays.asList(
+                        select(nodes, 1),
+                        select(nodes, 2),
+                        select(nodes, 2),
+                        select(nodes, 0),
+                        select(nodes, 1)
+                    ), Location.HAS_PARTITIONED_CACHES);
+                if (cacheId == CU.cacheId("Project"))
+                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+        };
+
         Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
 
         assertNotNull(ctx);
@@ -682,6 +590,22 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             "ON d.projectId = p.ver0 " +
             "WHERE (d.projectId + 1) > ?";
 
+        TestRegistry registry = new TestRegistry(){
+            @Override public DistributionTrait distribution(int cacheId, 
RowType rowType) {
+                return IgniteDistributions.broadcast();
+            }
+
+            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return new Location(select(nodes, 2), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+                else if (cacheId == CU.cacheId("Project"))
+                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+        };
+
         Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
 
         assertNotNull(ctx);
@@ -734,11 +658,10 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
         assertNotNull(plan);
 
-        assertTrue(plan.fragments().size() == 4);
+        assertTrue(plan.fragments().size() == 3);
     }
 
     @Test
-    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
     public void testSplitterPartiallyReplicated1() throws Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
@@ -828,7 +751,6 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     @Test
-    @Ignore("Need to request broadcast trait as a variant for left inner join 
sub-tree.")
     public void testSplitterPartiallyReplicated2() throws Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +

Reply via email to