This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new e18d8a1 pending
e18d8a1 is described below
commit e18d8a17a211b61b15213ddedcc8ac637b4f5888
Author: Igor Seliverstov <[email protected]>
AuthorDate: Wed Oct 30 19:46:03 2019 +0300
pending
---
modules/calcite/pom.xml | 7 ++
.../query/calcite/CalciteQueryProcessor.java | 6 +-
.../query/calcite/exchange/Receiver.java | 14 ++++
.../processors/query/calcite/exchange/Sender.java | 38 ++++++++++
.../calcite/metadata/IgniteMdDistribution.java | 2 +-
.../metadata/IgniteMdSourceDistribution.java | 5 ++
.../calcite/rel/logical/IgniteLogicalExchange.java | 15 ++--
.../calcite/rel/logical/IgniteLogicalFilter.java | 7 +-
.../calcite/rel/logical/IgniteLogicalProject.java | 7 +-
.../rel/logical/IgniteLogicalTableScan.java | 3 +-
.../processors/query/calcite/rule/IgniteRules.java | 8 +--
.../query/calcite/rule/logical/IgniteJoinRule.java | 7 +-
.../query/calcite/schema/IgniteTable.java | 18 +++--
.../query/calcite/splitter/Fragment.java | 68 ++++++++++++++++++
.../calcite/splitter/PartitionsDistribution.java | 2 +-
.../splitter/PartitionsDistributionRegistry.java | 4 +-
.../splitter/{SplitTask.java => QueryPlan.java} | 24 +++----
.../splitter/{TaskSplitter.java => Splitter.java} | 37 ++++------
.../DistributionFunction.java} | 9 +--
.../DistributionFunctionFactory.java} | 9 +--
.../query/calcite/trait/DistributionTrait.java | 8 ++-
.../query/calcite/trait/DistributionTraitDef.java | 2 +-
.../query/calcite/trait/DistributionTraitImpl.java | 8 ++-
.../query/calcite/trait/IgniteDistributions.java | 35 ++++++++--
.../processors/query/calcite/util/Commons.java | 21 +-----
.../query/calcite/CalciteQueryProcessorTest.java | 81 ++++++++++++++--------
26 files changed, 304 insertions(+), 141 deletions(-)
diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index 1fa26fa..d654edb 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -79,6 +79,13 @@
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index d1ad130..b036397 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -48,7 +48,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.internal.processors.query.calcite.util.Commons.contextParameter;
+import static
org.apache.ignite.internal.processors.query.calcite.util.Commons.provided;
/**
*
@@ -148,8 +148,8 @@ public class CalciteQueryProcessor implements QueryEngine {
return Contexts.chain(ctx, config.getContext(),
Contexts.of(
new Query(query, params),
- contextParameter(ctx, SchemaPlus.class, schemaHolder::schema),
- contextParameter(ctx, AffinityTopologyVersion.class,
this::readyAffinityVersion)));
+ provided(ctx, SchemaPlus.class, schemaHolder::schema),
+ provided(ctx, AffinityTopologyVersion.class,
this::readyAffinityVersion)));
}
private QueryExecution prepare(Context ctx) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
index 993f55b..fe35b52 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
@@ -21,13 +21,17 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
/**
*
*/
public class Receiver extends SingleRel implements IgniteRel {
+ private SourceDistribution sourceDistribution;
+
/**
* @param cluster Cluster this relational expression belongs to
* @param traits Trait set.
@@ -50,4 +54,14 @@ public class Receiver extends SingleRel implements IgniteRel
{
@Override public <T> T accept(IgniteVisitor<T> visitor) {
return visitor.visitReceiver(this);
}
+
+ public void init(SourceDistribution targetDistribution, RelMetadataQueryEx
mq) {
+ getInput().init(targetDistribution);
+
+ sourceDistribution = getInput().sourceDistribution(mq);
+ }
+
+ public SourceDistribution sourceDistribution() {
+ return sourceDistribution;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
index 63cf8f7..f9bd762 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
@@ -16,17 +16,28 @@
package org.apache.ignite.internal.processors.query.calcite.exchange;
+import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
/**
*
*/
public class Sender extends SingleRel implements IgniteRel {
+ private SourceDistribution sourceDistribution;
+ private SourceDistribution targetDistribution;
+ private DistributionFunction targetFunction;
+
/**
* Creates a <code>SingleRel</code>.
*
@@ -38,7 +49,34 @@ public class Sender extends SingleRel implements IgniteRel {
super(cluster, traits, input);
}
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new Sender(getCluster(), traitSet, sole(inputs));
+ }
+
@Override public <T> T accept(IgniteVisitor<T> visitor) {
return visitor.visitSender(this);
}
+
+ public void init(SourceDistribution targetDistribution) {
+ this.targetDistribution = targetDistribution;
+ }
+
+ public DistributionFunction targetFunction() {
+ if (targetFunction == null) {
+ assert targetDistribution != null &&
targetDistribution.partitionMapping != null;
+
+ DistributionTrait distribution =
getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
+
+ targetFunction =
distribution.functionFactory().create(targetDistribution, distribution.keys());
+ }
+
+ return targetFunction;
+ }
+
+ public SourceDistribution sourceDistribution(RelMetadataQuery mq) {
+ if (sourceDistribution == null)
+ sourceDistribution =
RelMetadataQueryEx.wrap(mq).getSourceDistribution(getInput());
+
+ return sourceDistribution;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index 298e905..f644d0e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -108,7 +108,7 @@ public class IgniteMdDistribution implements
MetadataHandler<IgniteMetadata.Dist
newKeys.add(mapped);
}
- return IgniteDistributions.hash(newKeys);
+ return IgniteDistributions.hash(newKeys, trait.functionFactory());
}
return trait;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
index 255e06c..d44a57b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
@@ -27,6 +27,7 @@ import
org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.SourceDistributionMetadata;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
@@ -58,6 +59,10 @@ public class IgniteMdSourceDistribution implements
MetadataHandler<SourceDistrib
return distribution(rel.getInput(), mq);
}
+ public SourceDistribution getSourceDistribution(Sender rel,
RelMetadataQuery mq) {
+ return rel.sourceDistribution(mq);
+ }
+
public SourceDistribution getSourceDistribution(BiRel rel,
RelMetadataQuery mq) {
mq = RelMetadataQueryEx.wrap(mq);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
index 3a577e9..32b8b09 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
@@ -22,12 +22,12 @@ import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
/**
@@ -52,14 +52,6 @@ public final class IgniteLogicalExchange extends SingleRel
implements IgniteRel
Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
}
- public DistributionTrait sourceDistribution() {
- return
getInput().getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
- }
-
- public DistributionTrait targetDistribution() {
- return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
- }
-
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new IgniteLogicalExchange(getCluster(), traitSet, sole(inputs));
}
@@ -67,4 +59,9 @@ public final class IgniteLogicalExchange extends SingleRel
implements IgniteRel
@Override public <T> T accept(IgniteVisitor<T> visitor) {
return visitor.visitExchange(this);
}
+
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .item("distribution",
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
index 34bbe3d..0da0d04 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalFilter.java
@@ -24,11 +24,12 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
public final class IgniteLogicalFilter extends Filter implements IgniteRel {
private final Set<CorrelationId> variablesSet;
@@ -53,10 +54,10 @@ public final class IgniteLogicalFilter extends Filter
implements IgniteRel {
.itemIf("variablesSet", variablesSet, !variablesSet.isEmpty());
}
- public static IgniteLogicalFilter create(LogicalFilter filter, RelNode
input) {
+ public static IgniteLogicalFilter create(Filter filter, RelNode input) {
RelTraitSet traits = filter.getTraitSet()
.replace(IgniteRel.LOGICAL_CONVENTION)
-
.replace(IgniteMdDistribution.filter(filter.getCluster().getMetadataQuery(),
input, filter.getCondition()));
+ .replaceIf(DistributionTraitDef.INSTANCE, () ->
IgniteMdDistribution.filter(RelMetadataQueryEx.instance(), input,
filter.getCondition()));
return new IgniteLogicalFilter(filter.getCluster(), traits, input,
filter.getCondition(), filter.getVariablesSet());
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
index 56c0f50..e67d371 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalProject.java
@@ -21,12 +21,13 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
public final class IgniteLogicalProject extends Project implements IgniteRel {
public IgniteLogicalProject(
@@ -43,10 +44,10 @@ public final class IgniteLogicalProject extends Project
implements IgniteRel {
return new IgniteLogicalProject(getCluster(), traitSet, input, projects,
rowType);
}
- public static IgniteLogicalProject create(LogicalProject project, RelNode
input) {
+ public static IgniteLogicalProject create(Project project, RelNode input) {
RelTraitSet traits = project.getTraitSet()
.replace(IgniteRel.LOGICAL_CONVENTION)
-
.replace(IgniteMdDistribution.project(project.getCluster().getMetadataQuery(),
input, project.getProjects()));
+ .replaceIf(DistributionTraitDef.INSTANCE, () ->
IgniteMdDistribution.project(RelMetadataQueryEx.instance(), input,
project.getProjects()));
return new IgniteLogicalProject(project.getCluster(), traits, input,
project.getProjects(), project.getRowType());
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index ca2ae90..fb4cf58 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -39,7 +39,8 @@ public final class IgniteLogicalTableScan extends TableScan
implements IgniteRel
}
public SourceDistribution tableDistribution() {
- return
getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext());
+ return getTable().unwrap(IgniteTable.class)
+ .sourceDistribution(getCluster().getPlanner().getContext());
}
@Override public <T> T accept(IgniteVisitor<T> visitor) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
index 7873c6c..2d155f5 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
@@ -158,17 +158,15 @@ public class IgniteRules {
SubQueryRemoveRule.PROJECT,
SubQueryRemoveRule.JOIN);
- public static final List<RelOptRule> IGNITE_BASE_RULES = ImmutableList.of(
+ public static final List<RelOptRule> IGNITE_LOGICAL_RULES =
ImmutableList.of(
IgniteFilterRule.INSTANCE,
IgniteProjectRule.INSTANCE,
IgniteJoinRule.INSTANCE);
public static List<RelOptRule> logicalRules(Context ctx) {
return ImmutableList.<RelOptRule>builder()
-// .addAll(BASE_RULES)
-// .addAll(ABSTRACT_RULES)
- .addAll(ABSTRACT_RELATIONAL_RULES)
- .addAll(IGNITE_BASE_RULES)
+ .addAll(IGNITE_LOGICAL_RULES)
+ .add(AbstractConverter.ExpandConversionRule.INSTANCE)
.build();
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
index a5abdea..3913cb4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -45,11 +46,11 @@ public class IgniteJoinRule extends RelOptRule {
RelTraitSet leftTraits = join.getLeft().getTraitSet()
.replace(IgniteRel.LOGICAL_CONVENTION)
-
.replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys));
+
.replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys,
IgniteDistributions.noOpFunction()));
RelTraitSet rightTraits = join.getRight().getTraitSet()
.replace(IgniteRel.LOGICAL_CONVENTION)
-
.replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys));
+
.replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys,
IgniteDistributions.noOpFunction()));
RelNode left = convert(join.getLeft(), leftTraits);
RelNode right = convert(join.getRight(), rightTraits);
@@ -58,7 +59,7 @@ public class IgniteJoinRule extends RelOptRule {
RelTraitSet traitSet = join.getTraitSet()
.replace(IgniteRel.LOGICAL_CONVENTION)
- .replace(IgniteMdDistribution.join(mq, left, right,
join.getCondition()));
+ .replaceIf(DistributionTraitDef.INSTANCE, () ->
IgniteMdDistribution.join(mq, left, right, join.getCondition()));
call.transformTo(new IgniteLogicalJoin(join.getCluster(), traitSet,
left, right,
join.getCondition(), join.getVariablesSet(), join.getJoinType(),
join.isSemiJoinDone()));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index bccf570..bf8b752 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.util.GridIntList;
@@ -71,23 +72,26 @@ public class IgniteTable extends AbstractTable implements
TranslatableTable {
@Override public RelNode toRel(RelOptTable.ToRelContext context,
RelOptTable relOptTable) {
RelOptCluster cluster = context.getCluster();
RelTraitSet traitSet =
cluster.traitSet().replace(IgniteRel.LOGICAL_CONVENTION)
- .replaceIf(DistributionTraitDef.INSTANCE, () ->
IgniteDistributions.hash(rowType.distributionKeys()));
+ .replaceIf(DistributionTraitDef.INSTANCE, () ->
distributionTrait(cluster.getPlanner().getContext()));
return new IgniteLogicalTableScan(cluster, traitSet, relOptTable);
}
- public SourceDistribution tableDistribution(Context context) {
- SourceDistribution res = new SourceDistribution();
+ public DistributionTrait distributionTrait(Context context) {
+ return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.noOpFunction()); // TODO
+ }
- GridIntList localInputs = new GridIntList();
+ public SourceDistribution sourceDistribution(Context context) {
+ int cacheId = CU.cacheId(cacheName);
- localInputs.add(CU.cacheId(cacheName));
+ SourceDistribution res = new SourceDistribution();
+ GridIntList localInputs = new GridIntList();
+ localInputs.add(cacheId);
res.localInputs = localInputs;
PartitionsDistributionRegistry registry =
context.unwrap(PartitionsDistributionRegistry.class);
AffinityTopologyVersion topVer =
context.unwrap(AffinityTopologyVersion.class);
-
- res.partitionMapping =
registry.partitionsDistribution(CU.cacheId(cacheName), topVer);
+ res.partitionMapping = registry.get(cacheId, topVer);
return res;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
new file mode 100644
index 0000000..f9ee3d1
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.splitter;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class Fragment {
+ public final RelNode root;
+
+ public SourceDistribution distribution;
+
+ public Fragment(RelNode root) {
+ this.root = root;
+ }
+
+ public void init(Context ctx) {
+ RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
+
+ distribution = mq.getSourceDistribution(root);
+
+ PartitionsDistribution mapping = distribution.partitionMapping;
+
+ if (mapping == null)
+ distribution.partitionMapping = isRootFragment() ?
registry(ctx).single() : registry(ctx).random(topologyVersion(ctx));
+ else if (mapping.excessive)
+ distribution.partitionMapping = mapping.deduplicate();
+
+ if (!F.isEmpty(distribution.remoteInputs)) {
+ for (Receiver input : distribution.remoteInputs)
+ input.init(distribution, mq);
+ }
+ }
+
+ private boolean isRootFragment() {
+ return !(root instanceof Sender);
+ }
+
+ private PartitionsDistributionRegistry registry(Context ctx) {
+ return ctx.unwrap(PartitionsDistributionRegistry.class);
+ }
+
+ private AffinityTopologyVersion topologyVersion(Context ctx) {
+ return ctx.unwrap(AffinityTopologyVersion.class);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
index 2a7707c..72ed1fa 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistribution.java
@@ -42,7 +42,7 @@ public class PartitionsDistribution {
int i = 0, j = 0, k = 0;
- while (i < nodes0.length && j < other.nodes.length) {
+ while (i < nodes.length && j < other.nodes.length) {
if (nodes[i] < other.nodes[j])
i++;
else if (other.nodes[j] < nodes[i])
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
index f234ae7..568ce65 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
@@ -22,5 +22,7 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
*
*/
public interface PartitionsDistributionRegistry {
- PartitionsDistribution partitionsDistribution(int cacheId,
AffinityTopologyVersion topVer);
+ PartitionsDistribution get(int cacheId, AffinityTopologyVersion topVer);
+ PartitionsDistribution random(AffinityTopologyVersion topVer);
+ PartitionsDistribution single();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
similarity index 60%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index 1fbe086..84985e6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -16,26 +16,22 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
-import org.apache.calcite.rel.RelNode;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.Context;
/**
*
*/
-public class SplitTask {
- public final SourceDistribution distribution;
- public final RelNode root;
+public class QueryPlan {
+ private final ImmutableList<Fragment> fragments;
- public SplitTask(RelNode root, SourceDistribution distribution) {
- this.distribution = distribution;
- this.root = root;
-
- init();
+ public QueryPlan(ImmutableList<Fragment> fragments) {
+ this.fragments = fragments;
}
- private void init() {
- PartitionsDistribution mapping = distribution.partitionMapping;
-
- if (mapping != null && mapping.excessive)
- distribution.partitionMapping = mapping.deduplicate();
+ public void init(Context ctx) {
+ for (Fragment fragment : fragments) {
+ fragment.init(ctx);
+ }
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
similarity index 73%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 22527b3..5220d74 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -16,15 +16,13 @@
package org.apache.ignite.internal.processors.query.calcite.splitter;
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
import org.apache.ignite.internal.processors.query.calcite.exchange.Sender;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdSourceDistribution;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteVisitor;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalExchange;
@@ -36,35 +34,26 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog
/**
*
*/
-public class TaskSplitter implements IgniteVisitor<IgniteRel> {
- /** */
- private List<IgniteRel> roots;
+public class Splitter implements IgniteVisitor<IgniteRel> {
+ private ImmutableList.Builder<Fragment> b;
- public List<SplitTask> go(IgniteRel root) {
- roots = new ArrayList<>();
+ public QueryPlan go(IgniteRel root) {
+ b = ImmutableList.builder();
- roots.add(root.accept(this));
-
- ArrayList<SplitTask> splitTasks = new ArrayList<>(roots.size());
-
- RelMetadataQuery mq = RelMetadataQueryEx.instance();
-
- for (IgniteRel igniteRel : roots) {
- splitTasks.add(new SplitTask(igniteRel,
IgniteMdSourceDistribution.distribution(igniteRel, mq)));
- }
-
- return splitTasks;
+ return new QueryPlan(b.add(new Fragment(root.accept(this))).build());
}
@Override public IgniteRel visitExchange(IgniteLogicalExchange exchange) {
RelOptCluster cluster = exchange.getCluster();
- RelNode input = exchange.getInput();
+ RelTraitSet traitSet = exchange.getTraitSet();
+
+ IgniteRel input = visitChildren(exchange.getInput());
- Sender sender = new Sender(cluster, input.getTraitSet(),
visitChildren(input));
+ Sender sender = new Sender(cluster, traitSet, input);
- roots.add(sender);
+ b.add(new Fragment(sender));
- return new Receiver(cluster, exchange.getTraitSet(), sender);
+ return new Receiver(cluster, traitSet, sender);
}
@Override public IgniteRel visitFilter(IgniteLogicalFilter filter) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
similarity index 70%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
index f234ae7..76af490 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunction.java
@@ -14,13 +14,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.trait;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
/**
*
*/
-public interface PartitionsDistributionRegistry {
- PartitionsDistribution partitionsDistribution(int cacheId,
AffinityTopologyVersion topVer);
+public interface DistributionFunction {
+ List<ClusterNode> destination(Object row);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
similarity index 67%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
index f234ae7..8bc129e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/PartitionsDistributionRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionFunctionFactory.java
@@ -14,13 +14,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.splitter;
+package org.apache.ignite.internal.processors.query.calcite.trait;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.calcite.util.ImmutableIntList;
+import
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
/**
*
*/
-public interface PartitionsDistributionRegistry {
- PartitionsDistribution partitionsDistribution(int cacheId,
AffinityTopologyVersion topVer);
+public interface DistributionFunctionFactory {
+ DistributionFunction create(SourceDistribution targetDistr,
ImmutableIntList keys);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index 3dabee1..2510c2c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -16,6 +16,7 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
+import java.util.Objects;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.util.ImmutableIntList;
@@ -45,10 +46,10 @@ public interface DistributionTrait extends RelTrait {
}
}
- DistributionTrait ANY = IgniteDistributions.single();
-
DistributionType type();
+ DistributionFunctionFactory functionFactory();
+
@Override default RelTraitDef getTraitDef() {
return DistributionTraitDef.INSTANCE;
}
@@ -66,7 +67,8 @@ public interface DistributionTrait extends RelTrait {
return true;
if (type() == other.type())
- return type() != DistributionType.HASH ||
keys().equals(other.keys());
+ return type() != DistributionType.HASH
+ || (Objects.equals(keys(), other.keys()) &&
Objects.equals(functionFactory(), other.functionFactory()));
return other.type() == DistributionType.RANDOM && type() ==
DistributionType.HASH;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index facc258..ef614bc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -64,6 +64,6 @@ public class DistributionTraitDef extends
RelTraitDef<DistributionTrait> {
}
@Override public DistributionTrait getDefault() {
- return DistributionTrait.ANY;
+ return IgniteDistributions.any();
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
index 5b7b748..099c96b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
@@ -26,16 +26,22 @@ import org.apache.calcite.util.ImmutableIntList;
public class DistributionTraitImpl implements DistributionTrait {
private final DistributionType type;
private final ImmutableIntList keys;
+ private final DistributionFunctionFactory functionFactory;
- public DistributionTraitImpl(DistributionType type, ImmutableIntList keys)
{
+ public DistributionTraitImpl(DistributionType type, ImmutableIntList keys,
DistributionFunctionFactory functionFactory) {
this.type = type;
this.keys = keys;
+ this.functionFactory = functionFactory;
}
@Override public DistributionType type() {
return type;
}
+ @Override public DistributionFunctionFactory functionFactory() {
+ return functionFactory;
+ }
+
@Override public ImmutableIntList keys() {
return keys;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 7537d63..59f69fe 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -25,11 +25,12 @@ import static
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
*
*/
public class IgniteDistributions {
- /** */
- private static final DistributionTraitDef traitDef =
DistributionTraitDef.INSTANCE;
- private static final DistributionTrait SINGLE = traitDef.canonize(new
DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE,
ImmutableIntList.of()));
- private static final DistributionTrait RANDOM = traitDef.canonize(new
DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM,
ImmutableIntList.of()));
- private static final DistributionTrait ANY = traitDef.canonize(new
DistributionTraitImpl(DistributionTrait.DistributionType.ANY,
ImmutableIntList.of()));
+ private static final DistributionFunctionFactory NO_OP_FACTORY = (t,k) ->
null;
+
+ private static final DistributionTrait BROADCAST = new
DistributionTraitImpl(DistributionTrait.DistributionType.BROADCAST,
ImmutableIntList.of(), allTargetsFunction());
+ private static final DistributionTrait SINGLE = new
DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE,
ImmutableIntList.of(), singleTargetFunction());
+ private static final DistributionTrait RANDOM = new
DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM,
ImmutableIntList.of(), randomTargetFunction());
+ private static final DistributionTrait ANY = new
DistributionTraitImpl(DistributionTrait.DistributionType.ANY,
ImmutableIntList.of(), noOpFunction());
public static DistributionTrait any() {
return ANY;
@@ -43,7 +44,27 @@ public class IgniteDistributions {
return SINGLE;
}
- public static DistributionTrait hash(List<Integer> keys) {
- return traitDef.canonize(new DistributionTraitImpl(HASH,
ImmutableIntList.copyOf(keys)));
+ public static DistributionTrait broadcast() {
+ return BROADCAST;
+ }
+
+ public static DistributionTrait hash(List<Integer> keys,
DistributionFunctionFactory factory) {
+ return new DistributionTraitImpl(HASH, ImmutableIntList.copyOf(keys),
factory);
+ }
+
+ public static DistributionFunctionFactory noOpFunction() {
+ return NO_OP_FACTORY;
+ }
+
+ public static DistributionFunctionFactory singleTargetFunction() {
+ return noOpFunction(); // TODO
+ }
+
+ public static DistributionFunctionFactory allTargetsFunction() {
+ return noOpFunction(); // TODO
+ }
+
+ public static DistributionFunctionFactory randomTargetFunction() {
+ return noOpFunction(); // TODO
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index b638815..739d411 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -31,7 +31,6 @@ import org.apache.calcite.plan.RelOptNode;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
@@ -52,11 +51,11 @@ public final class Commons {
return ctx == null ? Contexts.empty() :
Contexts.of(ctx.unwrap(Object[].class));
}
- public static <T> @Nullable T contextParameter(Context ctx, Class<T>
paramType, Supplier<T> paramSrc) {
+ public static <T> @Nullable T provided(Context ctx, Class<T> paramType,
Supplier<T> paramSrc) {
T param = ctx.unwrap(paramType);
if (param != null)
- return param;
+ return null; // Provided by parent context.
return paramSrc.get();
}
@@ -81,26 +80,10 @@ public final class Commons {
return b.build();
}
- public static RelOptRuleOperand any(Class<? extends RelNode> first,
RelTrait trait){
- return RelOptRule.operand(first, trait, RelOptRule.any());
- }
-
- public static RelOptRuleOperand any(Class<? extends RelNode> first){
- return RelOptRule.operand(first, RelOptRule.any());
- }
-
public static RelOptRuleOperand any(Class<? extends RelNode> first,
Class<? extends RelNode> second) {
return RelOptRule.operand(first, RelOptRule.operand(second,
RelOptRule.any()));
}
- public static RelOptRuleOperand some(Class<? extends RelNode> rel,
RelOptRuleOperand first, RelOptRuleOperand... rest){
- return RelOptRule.operand(rel, RelOptRule.some(first, rest));
- }
-
- public static RelOptRuleOperand some(Class<? extends RelNode> rel,
RelTrait trait, RelOptRuleOperand first, RelOptRuleOperand... rest){
- return RelOptRule.operand(rel, trait, RelOptRule.some(first, rest));
- }
-
public static <T extends RelNode> RelOp<T, Boolean>
transformSubset(RelOptRuleCall call, RelNode input, BiFunction<T, RelNode,
RelNode> transformFun) {
return rel -> {
if (!(input instanceof RelSubset))
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 884b9ca..d9ebd5b 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite;
-import java.util.List;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -40,13 +39,12 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistribution;
import
org.apache.ignite.internal.processors.query.calcite.splitter.PartitionsDistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.splitter.SplitTask;
-import
org.apache.ignite.internal.processors.query.calcite.splitter.TaskSplitter;
+import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -54,7 +52,7 @@ import org.junit.Test;
/**
*
*/
-@WithSystemProperty(key = "calcite.debug", value = "true")
+//@WithSystemProperty(key = "calcite.debug", value = "true")
public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
private static CalciteQueryProcessor proc;
@@ -62,6 +60,9 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
private static PartitionsDistribution developerDistribution;
private static PartitionsDistribution projectDistribution;
+ private static PartitionsDistribution randomDistribution;
+ private static PartitionsDistribution singleDistribution;
+ private static PartitionsDistributionRegistry registry;
@BeforeClass
public static void setupClass() {
@@ -80,12 +81,6 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
.field("cityId", Integer.class)
.build()));
- developerDistribution = new PartitionsDistribution();
-
- developerDistribution.parts = 5;
- developerDistribution.nodes = new int[]{0,1,2};
- developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
-
publicSchema.addTable("Project", new IgniteTable("Project", "Project",
RowType.builder()
.keyField("id", Integer.class, true)
@@ -93,12 +88,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
.field("ver", Integer.class)
.build()));
- projectDistribution = new PartitionsDistribution();
- projectDistribution.excessive = true;
- projectDistribution.parts = 5;
- projectDistribution.nodes = new int[]{0,1,2};
- projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
publicSchema.addTable("Country", new IgniteTable("Country", "Country",
RowType.builder()
@@ -117,6 +107,48 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
schema = Frameworks.createRootSchema(false);
schema.add("PUBLIC", publicSchema);
+
+ developerDistribution = new PartitionsDistribution();
+
+ developerDistribution.parts = 5;
+ developerDistribution.nodes = new int[]{0,1,2};
+ developerDistribution.nodeParts = new int[][]{{1,2},{3,4},{5}};
+
+ projectDistribution = new PartitionsDistribution();
+
+ projectDistribution.excessive = true;
+ projectDistribution.parts = 5;
+ projectDistribution.nodes = new int[]{0,1,2};
+ projectDistribution.nodeParts = new int[][]{{1,2,3,5},{2,3,4},{1,4,5}};
+
+ randomDistribution = new PartitionsDistribution();
+ randomDistribution.parts = 3;
+ randomDistribution.nodes = new int[]{0,1,2};
+ randomDistribution.nodeParts = new int[][]{{1},{2},{3}};
+
+ singleDistribution = new PartitionsDistribution();
+ singleDistribution.parts = 1;
+ singleDistribution.nodes = new int[]{0};
+ singleDistribution.nodeParts = new int[][]{{1}};
+
+ registry = new PartitionsDistributionRegistry() {
+ @Override public PartitionsDistribution get(int cacheId,
AffinityTopologyVersion topVer) {
+ if (cacheId == CU.cacheId("Developer"))
+ return developerDistribution;
+ if (cacheId == CU.cacheId("Project"))
+ return projectDistribution;
+
+ throw new AssertionError("Unexpected cache id:" + cacheId);
+ }
+
+ @Override public PartitionsDistribution
random(AffinityTopologyVersion topVer) {
+ return randomDistribution;
+ }
+
+ @Override public PartitionsDistribution single() {
+ return singleDistribution;
+ }
+ };
}
@Test
@@ -128,15 +160,6 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
"ON d.projectId = p.id0 " +
"WHERE (d.projectId + 1) > ?";
- PartitionsDistributionRegistry registry = (id, top) -> {
- if (id == CU.cacheId("Developer"))
- return developerDistribution;
- if (id == CU.cacheId("Project"))
- return projectDistribution;
-
- throw new AssertionError("Unexpected cache id:" + id);
- };
-
Context ctx = proc.context(Contexts.of(schema, registry,
AffinityTopologyVersion.NONE), sql, new Object[]{2});
assertNotNull(ctx);
@@ -181,8 +204,12 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- List<SplitTask> fragments = new TaskSplitter().go((IgniteRel)
relRoot.rel);
+ QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
- assertNotNull(fragments);
+ assertNotNull(plan);
}
}
\ No newline at end of file