This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new 525dfdb pending 525dfdb is described below commit 525dfdbd9e2ec1a3b250f22c5b420e8e0ef27a9b Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Mon Nov 11 20:08:56 2019 +0300 pending --- .../calcite/metadata/IgniteMdDistribution.java | 10 +- .../query/calcite/rule/IgniteFilterRule.java | 2 +- .../query/calcite/rule/IgniteHashJoinRule.java | 81 ++++++++++- .../query/calcite/rule/IgniteProjectRule.java | 2 +- .../processors/query/calcite/rule/IgniteRules.java | 4 +- .../query/calcite/trait/IgniteDistributions.java | 21 +++ .../processors/query/calcite/util/Commons.java | 13 ++ .../query/calcite/CalciteQueryProcessorTest.java | 160 ++++++--------------- 8 files changed, 164 insertions(+), 129 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java index b2f91a9..d58902a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java @@ -37,11 +37,13 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexSlot; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod; +import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST; import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH; /** @@ -76,6 +78,10 @@ public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.Dist return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE); } + public DistributionTrait getDistributionTrait(IgniteTableScan rel, RelMetadataQuery mq) { + return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE); + } + public static DistributionTrait project(RelMetadataQuery mq, RelNode input, List<RexNode> projects) { DistributionTrait trait = distribution(input, mq); @@ -120,7 +126,9 @@ public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.Dist } public static DistributionTrait join(RelMetadataQuery mq, RelNode left, RelNode right, RexNode condition) { - return distribution(left, mq); + DistributionTrait leftDist = distribution(left, mq); + + return leftDist.type() != BROADCAST ? leftDist : distribution(right, mq); } public static DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java index 9a7606c..2ec33a06 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java @@ -49,6 +49,6 @@ public class IgniteFilterRule extends RelOptRule { RelOp<LogicalFilter, Boolean> transformOp = Commons.transformSubset(call, converted, IgniteFilter::create); if (!transformOp.go(filter)) - call.transformTo(IgniteFilter.create(filter, converted)); + call.transformTo(LogicalFilter.create(converted, filter.getCondition())); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java index cc17c24..5bb537f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.query.calcite.rule; +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Objects; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelTraitSet; @@ -27,10 +30,15 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST; +import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH; +import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.SINGLE; + /** * */ @@ -38,25 +46,59 @@ public class IgniteHashJoinRule extends RelOptRule { public static final RelOptRule INSTANCE = new IgniteHashJoinRule(); public IgniteHashJoinRule() { - super(Commons.any(LogicalJoin.class, RelNode.class), RelFactories.LOGICAL_BUILDER, "IgniteJoinRule"); + super(Commons.any(LogicalJoin.class, RelNode.class), RelFactories.LOGICAL_BUILDER, "IgniteHashJoinRule"); } @Override public void onMatch(RelOptRuleCall call) { LogicalJoin join = call.rel(0); RelTraitSet leftTraits = join.getLeft().getTraitSet() - .replace(IgniteRel.IGNITE_CONVENTION) - .replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys, IgniteDistributions.noOpFunction())); + .replace(IgniteRel.IGNITE_CONVENTION); RelTraitSet rightTraits = join.getRight().getTraitSet() - .replace(IgniteRel.IGNITE_CONVENTION) - .replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys, IgniteDistributions.noOpFunction())); + .replace(IgniteRel.IGNITE_CONVENTION); RelNode left = convert(join.getLeft(), leftTraits); RelNode right = convert(join.getRight(), rightTraits); RelMetadataQuery mq = call.getMetadataQuery(); + List<DistributionTrait> leftDerived; + List<DistributionTrait> rightDerived; + + if ((leftDerived = IgniteDistributions.deriveDistributions(left, mq)).isEmpty() + || (rightDerived = IgniteDistributions.deriveDistributions(right, mq)).isEmpty()) { + call.transformTo(join.copy(join.getTraitSet(), ImmutableList.of(left, right))); + + return; + } + + List<DistributionTrait> leftDists = Commons.concat(leftDerived, + IgniteDistributions.hash(join.analyzeCondition().leftKeys, IgniteDistributions.hashFunction())); + + List<DistributionTrait> rightDists = Commons.concat(rightDerived, + IgniteDistributions.hash(join.analyzeCondition().rightKeys, IgniteDistributions.hashFunction())); + + for (DistributionTrait leftDist0 : leftDists) { + for (DistributionTrait rightDist0 : rightDists) { + if (canTransform(join, leftDist0, rightDist0)) + transform(call, join, mq, leftDist0, rightDist0); + } + } + } + + private void transform(RelOptRuleCall call, LogicalJoin join, RelMetadataQuery mq, DistributionTrait leftDist, DistributionTrait rightDist) { + RelTraitSet leftTraits = join.getLeft().getTraitSet() + .replace(IgniteRel.IGNITE_CONVENTION) + .replace(leftDist); + + RelTraitSet rightTraits = join.getRight().getTraitSet() + .replace(IgniteRel.IGNITE_CONVENTION) + .replace(rightDist); + + RelNode left = convert(join.getLeft(), leftTraits); + RelNode right = convert(join.getRight(), rightTraits); + RelTraitSet traitSet = join.getTraitSet() .replace(IgniteRel.IGNITE_CONVENTION) .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.join(mq, left, right, join.getCondition())); @@ -64,4 +106,33 @@ public class IgniteHashJoinRule extends RelOptRule { call.transformTo(new IgniteHashJoin(join.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone())); } + + private boolean canTransform(LogicalJoin join, DistributionTrait leftDist, DistributionTrait rightDist) { + if (leftDist.type() == BROADCAST + && rightDist.type() == BROADCAST) + return true; + + if (rightDist.type() == SINGLE + && leftDist.type() == SINGLE) + return true; + + if (leftDist.type() == BROADCAST + && rightDist.type() == HASH + && Objects.equals(rightDist.keys(), join.analyzeCondition().rightKeys)) + return true; + + if (rightDist.type() == BROADCAST + && leftDist.type() == HASH + && Objects.equals(leftDist.keys(), join.analyzeCondition().leftKeys)) + return true; + + if (leftDist.type() == HASH + && rightDist.type() == HASH + && Objects.equals(leftDist.keys(), join.analyzeCondition().leftKeys) + && Objects.equals(rightDist.keys(), join.analyzeCondition().rightKeys) + && Objects.equals(rightDist.destinationFunctionFactory(), leftDist.destinationFunctionFactory())) + return true; + + return false; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java index 0ea3a40..4c2993e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java @@ -49,6 +49,6 @@ public class IgniteProjectRule extends RelOptRule { RelOp<LogicalProject, Boolean> transformOp = Commons.transformSubset(call, converted, IgniteProject::create); if (!transformOp.go(project)) - call.transformTo(IgniteProject.create(project, converted)); + call.transformTo(LogicalProject.create(converted, project.getProjects(), project.getRowType())); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java index 53ce878..82f4960 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java @@ -155,14 +155,14 @@ public class IgniteRules { SubQueryRemoveRule.PROJECT, SubQueryRemoveRule.JOIN); - public static final List<RelOptRule> IGNITE_LOGICAL_RULES = ImmutableList.of( + public static final List<RelOptRule> IGNITE_RULES = ImmutableList.of( IgniteFilterRule.INSTANCE, IgniteProjectRule.INSTANCE, IgniteHashJoinRule.INSTANCE); public static List<RelOptRule> logicalRules(Context ctx) { return ImmutableList.<RelOptRule>builder() - .addAll(IGNITE_LOGICAL_RULES) + .addAll(IGNITE_RULES) .add(AbstractConverter.ExpandConversionRule.INSTANCE) .build(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java index 8f83227..57ccaf8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java @@ -16,12 +16,18 @@ package org.apache.ignite.internal.processors.query.calcite.trait; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.function.ToIntFunction; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH; @@ -107,4 +113,19 @@ public class IgniteDistributions { public static DestinationFunctionFactory hashFunction() { return HASH_FACTORY; } + + public static List<DistributionTrait> deriveDistributions(RelNode rel, RelMetadataQuery mq) { + if (!(rel instanceof RelSubset)) { + DistributionTrait dist = IgniteMdDistribution.distribution(rel, mq); + + return dist.type() == DistributionType.ANY ? Collections.emptyList() : Collections.singletonList(dist); + } + + HashSet<DistributionTrait> res = new HashSet<>(); + + for (RelNode relNode : ((RelSubset) rel).getRels()) + res.addAll(deriveDistributions(relNode, mq)); + + return res.isEmpty() ? Collections.emptyList() : new ArrayList<>(res); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index ea3ecc3..671bc94 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -222,4 +222,17 @@ public final class Commons { private static int[] ensureSize(int[] array, int size) { return size < array.length ? array : Arrays.copyOf(array, U.ceilPow2(size)); } + + public static <T> List<T> concat(List<T> col, T... elements) { + ArrayList<T> res = new ArrayList<>(col.size() + elements.length); + + res.addAll(col); + Collections.addAll(res, elements); + + return res; + } + +// public String explainToString(RelNode node) { +// RelWriterImpl +// } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index e6dc5c7..c5140c1 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -55,7 +55,6 @@ import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; /** @@ -263,119 +262,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } @Test - public void testVolcanoPlanerLocal() throws Exception { - String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + - "FROM PUBLIC.Developer d JOIN (" + - "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + - ") p " + - "ON d.projectId = p.id0 " + - "WHERE (d.projectId + 1) > ?"; - - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); - - assertNotNull(ctx); - - RelTraitDef[] traitDefs = { - ConventionTraitDef.INSTANCE - }; - - RelRoot relRoot; - - try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ - assertNotNull(planner); - - Query query = ctx.unwrap(Query.class); - - assertNotNull(planner); - - // Parse - SqlNode sqlNode = planner.parse(query.sql()); - - // Validate - sqlNode = planner.validate(sqlNode); - - // Convert to Relational operators graph - relRoot = planner.rel(sqlNode); - - RelNode rel = relRoot.rel; - - // Transformation chain - - RelTraitSet desired = rel.getCluster().traitSet() - .replace(IgniteRel.IGNITE_CONVENTION) - .replace(IgniteDistributions.single()) - .simplify(); - - rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired); - - relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind()); - } - - assertNotNull(relRoot.rel); - } - - @Test - public void testSplitterLocal() throws Exception { - String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + - "FROM PUBLIC.Developer d JOIN (" + - "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" + - ") p " + - "ON d.projectId = p.id0 " + - "WHERE (d.projectId + 1) > ?"; - - Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); - - assertNotNull(ctx); - - RelTraitDef[] traitDefs = { - ConventionTraitDef.INSTANCE - }; - - RelRoot relRoot; - - try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ - assertNotNull(planner); - - Query query = ctx.unwrap(Query.class); - - assertNotNull(planner); - - // Parse - SqlNode sqlNode = planner.parse(query.sql()); - - // Validate - sqlNode = planner.validate(sqlNode); - - // Convert to Relational operators graph - relRoot = planner.rel(sqlNode); - - RelNode rel = relRoot.rel; - - // Transformation chain - rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet()); - - RelTraitSet desired = rel.getCluster().traitSet() - .replace(IgniteRel.IGNITE_CONVENTION) - .replace(IgniteDistributions.single()) - .simplify(); - - rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired); - - relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind()); - } - - assertNotNull(relRoot); - - QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel); - - assertNotNull(plan); - - plan.init(ctx); - - assertNotNull(plan); - } - - @Test public void testSplitterCollocatedPartitionedPartitioned() throws Exception { String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + "FROM PUBLIC.Developer d JOIN (" + @@ -440,7 +326,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } @Test - @Ignore("Need to request broadcast trait as a variant for left inner join sub-tree.") public void testSplitterCollocatedReplicatedReplicated() throws Exception { String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + "FROM PUBLIC.Developer d JOIN (" + @@ -521,7 +406,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } @Test - @Ignore("Need to request broadcast trait as a variant for left inner join sub-tree.") public void testSplitterCollocatedReplicatedAndPartitioned() throws Exception { String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + "FROM PUBLIC.Developer d JOIN (" + @@ -618,6 +502,30 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; + TestRegistry registry = new TestRegistry(){ + @Override public DistributionTrait distribution(int cacheId, RowType rowType) { + if (cacheId == CU.cacheId("Project")) + return IgniteDistributions.broadcast(); + + return IgniteDistributions.hash(rowType.distributionKeys(), IgniteDistributions.hashFunction()); + } + + @Override public Location distributed(int cacheId, AffinityTopologyVersion topVer) { + if (cacheId == CU.cacheId("Developer")) + return new Location(null, Arrays.asList( + select(nodes, 1), + select(nodes, 2), + select(nodes, 2), + select(nodes, 0), + select(nodes, 1) + ), Location.HAS_PARTITIONED_CACHES); + if (cacheId == CU.cacheId("Project")) + return new Location(select(nodes, 0,1), null, (byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED)); + + throw new AssertionError("Unexpected cache id:" + cacheId); + } + }; + Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); assertNotNull(ctx); @@ -682,6 +590,22 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.ver0 " + "WHERE (d.projectId + 1) > ?"; + TestRegistry registry = new TestRegistry(){ + @Override public DistributionTrait distribution(int cacheId, RowType rowType) { + return IgniteDistributions.broadcast(); + } + + @Override public Location distributed(int cacheId, AffinityTopologyVersion topVer) { + if (cacheId == CU.cacheId("Developer")) + return new Location(select(nodes, 2), null, (byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED)); + + else if (cacheId == CU.cacheId("Project")) + return new Location(select(nodes, 0,1), null, (byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED)); + + throw new AssertionError("Unexpected cache id:" + cacheId); + } + }; + Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2}); assertNotNull(ctx); @@ -734,11 +658,10 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { assertNotNull(plan); - assertTrue(plan.fragments().size() == 4); + assertTrue(plan.fragments().size() == 3); } @Test - @Ignore("Need to request broadcast trait as a variant for left inner join sub-tree.") public void testSplitterPartiallyReplicated1() throws Exception { String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + "FROM PUBLIC.Developer d JOIN (" + @@ -828,7 +751,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } @Test - @Ignore("Need to request broadcast trait as a variant for left inner join sub-tree.") public void testSplitterPartiallyReplicated2() throws Exception { String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " + "FROM PUBLIC.Developer d JOIN (" +