This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 9179fd00cc2016f5fa1d219fa5a462c36b00c319 Author: Igor Seliverstov <[email protected]> AuthorDate: Fri Oct 25 17:52:53 2019 +0300 pending --- .../query/calcite/CalciteQueryProcessor.java | 37 ++--- .../calcite/exchange/DistributionRegistry.java | 28 ++++ .../query/calcite/exchange/Receiver.java | 48 +++++++ .../processors/query/calcite/exchange/Sender.java | 39 ++++++ .../calcite/metadata/IgniteMdDistribution.java | 44 +++--- .../metadata/IgniteMdSourceDistribution.java | 155 +++++++++++++++++++++ .../query/calcite/metadata/IgniteMetadata.java | 30 ++-- .../calcite/rel/logical/IgniteLogicalExchange.java | 12 +- .../rel/logical/IgniteLogicalTableScan.java | 9 +- .../query/calcite/rule/logical/IgniteJoinRule.java | 9 +- .../schema/CalciteSchemaChangeListener.java | 64 --------- .../query/calcite/schema/CalciteSchemaHolder.java | 47 +++++-- .../query/calcite/schema/IgniteSchema.java | 2 +- .../query/calcite/schema/IgniteTable.java | 48 ++++--- .../processors/query/calcite/schema/RowType.java | 140 +++++++++++++++++++ .../query/calcite/schema/SchemaProvider.java | 27 ---- .../query/calcite/schema/TableDescriptor.java | 27 ---- .../query/calcite/splitter/SourceDistribution.java | 32 +++++ .../query/calcite/splitter/SplitTask.java | 32 +++++ .../query/calcite/splitter/TaskSplitter.java | 26 ++++ ...iteDistribution.java => DistributionTrait.java} | 21 +-- ...tionTraitDef.java => DistributionTraitDef.java} | 18 +-- ...ibutionImpl.java => DistributionTraitImpl.java} | 27 +--- .../query/calcite/trait/IgniteDistributions.java | 25 ++-- .../processors/query/calcite/util/Commons.java | 43 ++++-- .../query/calcite/util/IgniteMethod.java | 6 +- .../query/calcite/util/ScanIterator.java | 2 +- .../query/calcite/CalciteQueryProcessorTest.java | 98 +++++-------- 28 files changed, 751 insertions(+), 345 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 737f19b..a024027 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -33,6 +33,7 @@ import org.apache.calcite.tools.Frameworks; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryContext; import org.apache.ignite.internal.processors.query.QueryEngine; @@ -40,7 +41,6 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.DistributedEx import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; import org.apache.ignite.internal.processors.query.calcite.prepare.Query; import org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution; -import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaChangeListener; import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; @@ -48,6 +48,8 @@ import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.query.calcite.util.Commons.contextParameter; + /** * */ @@ -62,7 +64,7 @@ public class CalciteQueryProcessor implements QueryEngine { private IgniteLogger log; /** */ - private GridKernalContext ctx; + private GridKernalContext kernalContext; public CalciteQueryProcessor() { config = Frameworks.newConfigBuilder() @@ -94,12 +96,12 @@ public class CalciteQueryProcessor implements QueryEngine { /** {@inheritDoc} */ @Override public void start(@NotNull GridKernalContext ctx) { - this.ctx = ctx; + kernalContext = ctx; GridInternalSubscriptionProcessor prc = ctx.internalSubscriptionProcessor(); if (prc != null) // Stubbed context doesn't have such processor - prc.registerSchemaChangeListener(new CalciteSchemaChangeListener(schemaHolder)); + prc.registerSchemaChangeListener(schemaHolder); } /** {@inheritDoc} */ @@ -122,7 +124,7 @@ public class CalciteQueryProcessor implements QueryEngine { } public GridKernalContext context() { - return ctx; + return kernalContext; } /** */ @@ -136,10 +138,6 @@ public class CalciteQueryProcessor implements QueryEngine { return new IgnitePlanner(cfg); } - private QueryExecution prepare(Context ctx) { - return new DistributedExecution(ctx); - } - /** * @param ctx External context. * @param query Query string. @@ -147,16 +145,19 @@ public class CalciteQueryProcessor implements QueryEngine { * @return Query execution context. */ Context context(@NotNull Context ctx, String query, Object[] params) { // Package private visibility for tests. - return Contexts.chain( - config.getContext(), - Contexts.of(schemaHolder.schema(), new Query(query, params)), - ctx); + return Contexts.chain(ctx, + Contexts.of( + new Query(query, params), + contextParameter(ctx, SchemaPlus.class, schemaHolder::schema), + contextParameter(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion)), + config.getContext()); } - /** - * @return Schema provider. - */ - CalciteSchemaHolder schemaHolder() { // Package private visibility for tests. - return schemaHolder; + private QueryExecution prepare(Context ctx) { + return new DistributedExecution(ctx); + } + + private AffinityTopologyVersion readyAffinityVersion() { + return kernalContext.cache().context().exchange().readyAffinityVersion(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java new file mode 100644 index 0000000..a2b0942 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exchange; + +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + +/** + * + */ +public interface DistributionRegistry { + Map<ClusterNode, int[]> partitionMapping(int cacheId, AffinityTopologyVersion topVer); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java new file mode 100644 index 0000000..8d2ee1c --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exchange; + +import java.util.List; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; + +/** + * + */ +public class Receiver extends SingleRel implements IgniteRel { + /** + * @param cluster Cluster this relational expression belongs to + * @param traits Trait set. + * @param sender Corresponding sender. + */ + protected Receiver(RelOptCluster cluster, RelTraitSet traits, Sender sender) { + super(cluster, traits, sender); + } + + /** {@inheritDoc} */ + @Override public Sender getInput() { + return (Sender) input; + } + + /** {@inheritDoc} */ + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new Receiver(getCluster(), traitSet, (Sender) sole(inputs)); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java new file mode 100644 index 0000000..5049250 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exchange; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; + +/** + * + */ +public class Sender extends SingleRel implements IgniteRel { + /** + * Creates a <code>SingleRel</code>. + * + * @param cluster Cluster this relational expression belongs to + * @param traits Trait set. + * @param input Input relational expression + */ + protected Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) { + super(cluster, traits, input); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java index a35c342..ded1a8d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java @@ -37,52 +37,52 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexSlot; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.ImmutableIntList; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod; -import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.HASH; +import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH; /** * */ -public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.Distribution> { +public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.TraitDistribution> { public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION.method(), new IgniteMdDistribution()); - @Override public MetadataDef<IgniteMetadata.Distribution> getDef() { - return IgniteMetadata.Distribution.DEF; + @Override public MetadataDef<IgniteMetadata.TraitDistribution> getDef() { + return IgniteMetadata.TraitDistribution.DEF; } - public IgniteDistribution distribution(RelNode rel, RelMetadataQuery mq) { - return IgniteDistributionTraitDef.INSTANCE.getDefault(); + public DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) { + return DistributionTraitDef.INSTANCE.getDefault(); } - public IgniteDistribution distribution(Filter filter, RelMetadataQuery mq) { + public DistributionTrait distribution(Filter filter, RelMetadataQuery mq) { return filter(mq, filter.getInput(), filter.getCondition()); } - public IgniteDistribution distribution(Project project, RelMetadataQuery mq) { + public DistributionTrait distribution(Project project, RelMetadataQuery mq) { return project(mq, project.getInput(), project.getProjects()); } - public IgniteDistribution distribution(Join join, RelMetadataQuery mq) { + public DistributionTrait distribution(Join join, RelMetadataQuery mq) { return join(mq, join.getLeft(), join.getRight(), join.getCondition()); } - public IgniteDistribution distribution(RelSubset rel, RelMetadataQuery mq) { - return rel.getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE); + public DistributionTrait distribution(RelSubset rel, RelMetadataQuery mq) { + return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE); } - public static IgniteDistribution project(RelMetadataQuery mq, RelNode input, List<RexNode> projects) { - IgniteDistribution trait = distribution_(input, mq); + public static DistributionTrait project(RelMetadataQuery mq, RelNode input, List<RexNode> projects) { + DistributionTrait trait = distribution_(input, mq); if (trait.type() == HASH) { ImmutableIntList keys = trait.keys(); if (keys.size() > projects.size()) - return IgniteDistributions.random(trait.sources()); + return IgniteDistributions.random(); Map<Integer, Integer> m = new HashMap<>(projects.size()); @@ -103,26 +103,26 @@ public class IgniteMdDistribution implements MetadataHandler<IgniteMetadata.Dist Integer mapped = m.get(key); if (mapped == null) - return IgniteDistributions.random(trait.sources()); + return IgniteDistributions.random(); newKeys.add(mapped); } - return IgniteDistributions.hash(newKeys, trait.sources()); + return IgniteDistributions.hash(newKeys); } return trait; } - public static IgniteDistribution filter(RelMetadataQuery mq, RelNode input, RexNode condition) { + public static DistributionTrait filter(RelMetadataQuery mq, RelNode input, RexNode condition) { return distribution_(input, mq); } - public static IgniteDistribution join(RelMetadataQuery mq, RelNode left, RelNode right, RexNode condition) { + public static DistributionTrait join(RelMetadataQuery mq, RelNode left, RelNode right, RexNode condition) { return distribution_(left, mq); } - public static IgniteDistribution distribution_(RelNode rel, RelMetadataQuery mq) { - return rel.metadata(IgniteMetadata.Distribution.class, mq).distribution(); + public static DistributionTrait distribution_(RelNode rel, RelMetadataQuery mq) { + return rel.metadata(IgniteMetadata.TraitDistribution.class, mq).distribution(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java new file mode 100644 index 0000000..1c80b96 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java @@ -0,0 +1,155 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.metadata; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.MetadataDef; +import org.apache.calcite.rel.metadata.MetadataHandler; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution; +import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; +import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod; +import org.apache.ignite.internal.util.GridIntList; + +/** + * + */ +public class IgniteMdSourceDistribution implements MetadataHandler<TaskDistribution> { + public static final RelMetadataProvider SOURCE = + ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.TASK_DISTRIBUTION.method(), new IgniteMdSourceDistribution()); + + @Override public MetadataDef<TaskDistribution> getDef() { + return TaskDistribution.DEF; + } + + public SourceDistribution distribution(RelNode rel, RelMetadataQuery mq) { + throw new AssertionError(); + } + + public SourceDistribution distribution(RelSubset rel, RelMetadataQuery mq) { + throw new AssertionError(); + } + + public SourceDistribution distribution(SingleRel rel, RelMetadataQuery mq) { + return distribution_(rel.getInput(), mq); + } + + public SourceDistribution distribution(BiRel rel, RelMetadataQuery mq) { + return merge(distribution_(rel.getLeft(), mq), distribution_(rel.getRight(), mq)); + } + + public SourceDistribution distribution(Receiver rel, RelMetadataQuery mq) { + SourceDistribution res = new SourceDistribution(); + + res.remoteInputs.add(rel); + + return res; + } + + public SourceDistribution distribution(IgniteLogicalTableScan rel, RelMetadataQuery mq) { + return rel.tableDistribution(); + } + + public static SourceDistribution distribution_(RelNode rel, RelMetadataQuery mq) { + return rel.metadata(TaskDistribution.class, mq).distribution(); + } + + private static SourceDistribution merge(SourceDistribution left, SourceDistribution right) { + SourceDistribution res = new SourceDistribution(); + + res.remoteInputs = merge(left.remoteInputs, right.remoteInputs); + res.partitionMapping = merge(left.partitionMapping, right.partitionMapping); + res.localInputs = merge(left.localInputs, right.localInputs); + + return res; + } + + private static <T> List<T> merge(List<T> left, List<T> right) { + if (left == null) + return right; + + if (right != null) + left.addAll(right); + + return left; + } + + private static GridIntList merge(GridIntList left, GridIntList right) { + if (left == null) + return right; + + if (right != null) + left.addAll(right); + + return left; + } + + private static Map<ClusterNode, int[]> merge(Map<ClusterNode, int[]> left, Map<ClusterNode, int[]> right) { + if (left == null) + return right; + + if (right == null) + return left; + + Map<ClusterNode, int[]> res = new HashMap<>(Math.min(left.size(), right.size())); + + Set<ClusterNode> keys = new HashSet<>(left.keySet()); + + keys.retainAll(right.keySet()); + + for (ClusterNode node : keys) { + int[] leftParts = left.get(node); + int[] rightParts = right.get(node); + + int[] nodeParts = new int[Math.min(leftParts.length, rightParts.length)]; + + int i = 0, j = 0, k = 0; + + while (i < leftParts.length && j < rightParts.length) { + if (leftParts[i] < rightParts[j]) + i++; + else if (rightParts[j] < leftParts[i]) + j++; + else { + nodeParts[k++] = leftParts[i]; + + i++; + j++; + } + } + + if (k > 0) + res.put(node, k < nodeParts.length ? Arrays.copyOf(nodeParts, k) : nodeParts); + } + + return res; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java index 1f5e23b..3b5445e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java @@ -25,7 +25,8 @@ import org.apache.calcite.rel.metadata.MetadataDef; import org.apache.calcite.rel.metadata.MetadataHandler; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod; /** @@ -35,18 +36,31 @@ public class IgniteMetadata { public static final RelMetadataProvider METADATA_PROVIDER = ChainedRelMetadataProvider.of( ImmutableList.of( - DefaultRelMetadataProvider.INSTANCE, - IgniteMdDistribution.SOURCE)); + IgniteMdDistribution.SOURCE, + IgniteMdSourceDistribution.SOURCE, + DefaultRelMetadataProvider.INSTANCE)); - public interface Distribution extends Metadata { - MetadataDef<Distribution> DEF = MetadataDef.of(Distribution.class, Distribution.Handler.class, IgniteMethod.DISTRIBUTION.method()); + public interface TraitDistribution extends Metadata { + MetadataDef<TraitDistribution> DEF = MetadataDef.of(TraitDistribution.class, TraitDistribution.Handler.class, IgniteMethod.DISTRIBUTION.method()); /** Determines how the rows are distributed. */ - IgniteDistribution distribution(); + DistributionTrait distribution(); /** Handler API. */ - interface Handler extends MetadataHandler<Distribution> { - IgniteDistribution distribution(RelNode r, RelMetadataQuery mq); + interface Handler extends MetadataHandler<TraitDistribution> { + DistributionTrait distribution(RelNode r, RelMetadataQuery mq); + } + } + + public interface TaskDistribution extends Metadata { + MetadataDef<TaskDistribution> DEF = MetadataDef.of(TaskDistribution.class, TaskDistribution.Handler.class, IgniteMethod.TASK_DISTRIBUTION.method()); + + /** Determines how the rows are distributed. */ + SourceDistribution distribution(); + + /** Handler API. */ + interface Handler extends MetadataHandler<TaskDistribution> { + SourceDistribution distribution(RelNode r, RelMetadataQuery mq); } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java index 03c4682..ed9bee6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java @@ -26,8 +26,8 @@ import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.Util; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; /** * @@ -51,12 +51,12 @@ public final class IgniteLogicalExchange extends SingleRel implements IgniteRel Util.nLogN(rowCount) * bytesPerRow, rowCount, 0); } - public IgniteDistribution sourceDistribution() { - return getInput().getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE); + public DistributionTrait sourceDistribution() { + return getInput().getTraitSet().getTrait(DistributionTraitDef.INSTANCE); } - public IgniteDistribution targetDistribution() { - return getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE); + public DistributionTrait targetDistribution() { + return getTraitSet().getTrait(DistributionTraitDef.INSTANCE); } @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java index e34c8f7..8698573 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java @@ -23,10 +23,11 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; public final class IgniteLogicalTableScan extends TableScan implements IgniteRel { - public IgniteLogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, - RelOptTable table) { + public IgniteLogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { super(cluster, traitSet, table); } @@ -35,4 +36,8 @@ public final class IgniteLogicalTableScan extends TableScan implements IgniteRel return this; } + + public SourceDistribution tableDistribution() { + return getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext()); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java index 7458c97..a5abdea 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java @@ -27,8 +27,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.processors.query.calcite.util.Commons; @@ -45,16 +43,13 @@ public class IgniteJoinRule extends RelOptRule { @Override public void onMatch(RelOptRuleCall call) { LogicalJoin join = call.rel(0); - IgniteDistribution leftDist = join.getLeft().getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE); - IgniteDistribution rightDist = join.getRight().getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE); - RelTraitSet leftTraits = join.getLeft().getTraitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys, leftDist.sources())); + .replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys)); RelTraitSet rightTraits = join.getRight().getTraitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys, rightDist.sources())); + .replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys)); RelNode left = convert(join.getLeft(), leftTraits); RelNode right = convert(join.getRight(), rightTraits); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java deleted file mode 100644 index 67cca7c..0000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.schema; - -import java.util.HashMap; -import java.util.Map; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.tools.Frameworks; -import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; -import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; -import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener; - -/** - * - */ -public class CalciteSchemaChangeListener implements SchemaChangeListener { - private final Map<String, IgniteSchema> schemas = new HashMap<>(); - private final CalciteSchemaHolder schemaHolder; - - public CalciteSchemaChangeListener(CalciteSchemaHolder schemaHolder) { - this.schemaHolder = schemaHolder; - } - - @Override public synchronized void onSchemaCreate(String schemaName) { - schemas.putIfAbsent(schemaName, new IgniteSchema(schemaName)); - rebuild(); - } - - @Override public synchronized void onSchemaDrop(String schemaName) { - schemas.remove(schemaName); - rebuild(); - } - - @Override public synchronized void onSqlTypeCreate(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { - schemas.computeIfAbsent(schemaName, IgniteSchema::new).onSqlTypeCreate(typeDescriptor, cacheInfo); - rebuild(); - } - - @Override public synchronized void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { - schemas.computeIfAbsent(schemaName, IgniteSchema::new).onSqlTypeDrop(typeDescriptor, cacheInfo); - rebuild(); - } - - public void rebuild() { - SchemaPlus schema = Frameworks.createRootSchema(false); - schemas.forEach(schema::add); - schemaHolder.schema(schema); - } -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java index 3978ae5..fb537e1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java @@ -17,23 +17,52 @@ package org.apache.ignite.internal.processors.query.calcite.schema; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener; /** * */ -public class CalciteSchemaHolder implements SchemaProvider { +public class CalciteSchemaHolder implements SchemaChangeListener { + private final Map<String, IgniteSchema> schemas = new HashMap<>(); private volatile SchemaPlus schema; - @Override public SchemaPlus schema() { - return Objects.requireNonNull(schema); - } - - /** - * @param schema Calcite schema. - */ public void schema(SchemaPlus schema) { this.schema = schema; } + + public SchemaPlus schema() { + return schema; + } + + @Override public synchronized void onSchemaCreate(String schemaName) { + schemas.putIfAbsent(schemaName, new IgniteSchema(schemaName)); + rebuild(); + } + + @Override public synchronized void onSchemaDrop(String schemaName) { + schemas.remove(schemaName); + rebuild(); + } + + @Override public synchronized void onSqlTypeCreate(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { + schemas.computeIfAbsent(schemaName, IgniteSchema::new).onSqlTypeCreate(typeDescriptor, cacheInfo); + rebuild(); + } + + @Override public synchronized void onSqlTypeDrop(String schemaName, GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) { + schemas.computeIfAbsent(schemaName, IgniteSchema::new).onSqlTypeDrop(typeDescriptor, cacheInfo); + rebuild(); + } + + private void rebuild() { + SchemaPlus schema = Frameworks.createRootSchema(false); + schemas.forEach(schema::add); + schema(schema); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java index bf95938..81f7265 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java @@ -55,7 +55,7 @@ public class IgniteSchema extends AbstractSchema { * @param cacheInfo Cache info. */ public void onSqlTypeCreate(GridQueryTypeDescriptor typeDesc, GridCacheContextInfo cacheInfo) { - IgniteTable table = new IgniteTable(typeDesc.tableName(), cacheInfo.name(), Commons.rowTypeFunction(typeDesc), null); + IgniteTable table = new IgniteTable(typeDesc.tableName(), cacheInfo.name(), Commons.rowType(typeDesc)); addTable(table.tableName(), table); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java index ef1606f..3cbda12 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -17,39 +17,35 @@ package org.apache.ignite.internal.processors.query.calcite.schema; -import java.util.function.Function; +import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.impl.AbstractTable; -import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.exchange.DistributionRegistry; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef; +import org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.typedef.internal.CU; /** */ public class IgniteTable extends AbstractTable implements TranslatableTable { private final String tableName; private final String cacheName; - private final Function<RelDataTypeFactory, RelDataType> rowType; - private final Statistic statistic; + private final RowType rowType; - - public IgniteTable(String tableName, String cacheName, - Function<RelDataTypeFactory, RelDataType> rowType, @Nullable Statistic statistic) { + public IgniteTable(String tableName, String cacheName, RowType rowType) { this.tableName = tableName; this.cacheName = cacheName; this.rowType = rowType; - - this.statistic = statistic == null ? Statistics.UNKNOWN : statistic; } /** @@ -67,21 +63,29 @@ public class IgniteTable extends AbstractTable implements TranslatableTable { } /** {@inheritDoc} */ - @Override public Statistic getStatistic() { - return statistic; - } - - /** {@inheritDoc} */ @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return rowType.apply(typeFactory); + return rowType.asRelDataType(typeFactory); } /** {@inheritDoc} */ @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { RelOptCluster cluster = context.getCluster(); - RelTraitSet traitSet = cluster.traitSet() - .replace(IgniteRel.LOGICAL_CONVENTION) - .replaceIf(IgniteDistributionTraitDef.INSTANCE, () -> IgniteDistributions.hash(ImmutableIntList.of(0), ImmutableIntList.of())); + RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.LOGICAL_CONVENTION) + .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteDistributions.hash(rowType.distributionKeys())); return new IgniteLogicalTableScan(cluster, traitSet, relOptTable); } + + public SourceDistribution tableDistribution(Context context) { + SourceDistribution res = new SourceDistribution(); + + res.localInputs = new GridIntList(); + res.localInputs.add(CU.cacheId(cacheName)); + + DistributionRegistry registry = context.unwrap(DistributionRegistry.class); + AffinityTopologyVersion topVer = context.unwrap(AffinityTopologyVersion.class); + + res.partitionMapping = registry.partitionMapping(CU.cacheId(cacheName), topVer); + + return res; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java new file mode 100644 index 0000000..0da2c70 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java @@ -0,0 +1,140 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.schema; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedHashSet; +import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.util.ImmutableIntList; + +/** + * + */ +public class RowType { + private final String[] fields; + private final Class[] types; + private final BitSet keyFields; + private final int affinityKey; + + public RowType(String[] fields, Class[] types, BitSet keyFields, int affinityKey) { + this.fields = fields; + this.types = types; + this.keyFields = keyFields; + this.affinityKey = affinityKey; + } + + public RelDataType asRelDataType(RelDataTypeFactory factory) { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(factory); + + int len = fields.length; + + for (int i = 0; i < len; i++) + builder.add(fields[i], factory.createJavaType(types[i])); + + return builder.build(); + } + + public List<Integer> distributionKeys() { + return ImmutableIntList.of(affinityKey); + } + + public boolean isKeyField(int idx) { + return keyFields.get(idx); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private int affinityKey; + private final LinkedHashSet<String> fields; + private final BitSet keyFields; + private final ArrayList<Class> types; + + private Builder() { + fields = new LinkedHashSet<>(); + types = new ArrayList<>(); + keyFields = new BitSet(); + + fields.add("_key"); types.add(Object.class); + fields.add("_val"); types.add(Object.class); + } + + public Builder key(Class type) { + if (types.get(0) != Object.class && types.get(0) != type) + throw new IllegalStateException("Key type is already set."); + + types.set(0, type); + + return this; + } + + public Builder val(Class type) { + if (types.get(1) != Object.class && types.get(1) != type) + throw new IllegalStateException("Value type is already set."); + + types.set(1, type); + + return this; + } + + public Builder field(String name, Class type) { + if (!fields.add(name)) + throw new IllegalStateException("Field name must be unique."); + + types.add(type); + + return this; + } + + public Builder keyField(String name, Class type) { + if (!fields.add(name)) + throw new IllegalStateException("Field name must be unique."); + + types.add(type); + + keyFields.set(types.size() - 1); + + return this; + } + + public Builder keyField(String name, Class type, boolean affinityKey) { + if (affinityKey && this.affinityKey > 0) + throw new IllegalStateException("Affinity key field must be unique."); + + if (!fields.add(name)) + throw new IllegalStateException("Field name must be unique."); + + types.add(type); + + keyFields.set(types.size() - 1); + + if (affinityKey) + this.affinityKey = types.size() - 1; + + return this; + } + + public RowType build() { + return new RowType(fields.toArray(new String[0]), types.toArray(new Class[0]), keyFields, affinityKey); + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java deleted file mode 100644 index 5fdd311..0000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.schema; - -import org.apache.calcite.schema.SchemaPlus; - -/** - * - */ -public interface SchemaProvider { - SchemaPlus schema(); -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java deleted file mode 100644 index 8c6cc61..0000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.schema; - -/** - * - */ -public interface TableDescriptor { - public boolean partitioned(); - - public boolean replicated(); -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java new file mode 100644 index 0000000..eded34e --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java @@ -0,0 +1,32 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.splitter; + +import java.util.List; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver; +import org.apache.ignite.internal.util.GridIntList; + +/** + * + */ +public class SourceDistribution { + public Map<ClusterNode, int[]> partitionMapping; // partition filter for unstable topology + public List<Receiver> remoteInputs; // remote inputs to notify particular senders about final task distribution + public GridIntList localInputs; // involved caches, used for partitions reservation +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java new file mode 100644 index 0000000..f405939 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java @@ -0,0 +1,32 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.splitter; + +import org.apache.calcite.rel.RelNode; + +/** + * + */ +public class SplitTask { + public final SourceDistribution distribution; + public final RelNode root; + + public SplitTask(RelNode root, SourceDistribution distribution) { + this.distribution = distribution; + this.root = root; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java new file mode 100644 index 0000000..4ca6716 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.splitter; + +import org.apache.calcite.rel.RelShuttleImpl; + +/** + * + */ +public class TaskSplitter extends RelShuttleImpl { + +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java similarity index 75% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java index f80fb2f..3dabee1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java @@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.query.calcite.trait; import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.util.ImmutableIntList; -import org.apache.calcite.util.Util; /** * */ -public interface IgniteDistribution extends RelTrait { - public enum DistributionType { +public interface DistributionTrait extends RelTrait { + enum DistributionType { HASH("hash"), RANDOM("random"), BROADCAST("broadcast"), @@ -46,25 +45,22 @@ public interface IgniteDistribution extends RelTrait { } } - IgniteDistribution ANY = new IgniteDistributionImpl(DistributionType.ANY, ImmutableIntList.of()); + DistributionTrait ANY = IgniteDistributions.single(); DistributionType type(); @Override default RelTraitDef getTraitDef() { - return IgniteDistributionTraitDef.INSTANCE; + return DistributionTraitDef.INSTANCE; } @Override default boolean satisfies(RelTrait trait) { if (trait == this) return true; - if (!(trait instanceof IgniteDistribution)) + if (!(trait instanceof DistributionTrait)) return false; - IgniteDistribution other = (IgniteDistribution) trait; - - if (!Util.startsWith(other.sources(), sources())) - return false; + DistributionTrait other = (DistributionTrait) trait; if (other.type() == DistributionType.ANY) return true; @@ -79,9 +75,4 @@ public interface IgniteDistribution extends RelTrait { * @return Hash distribution columns ordinals or empty list otherwise. */ ImmutableIntList keys(); - - /** - * @return Sources (indexes of sorted by node order nodes list for particular topology). - */ - ImmutableIntList sources(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionTraitDef.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java similarity index 73% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionTraitDef.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java index 85e1999..facc258 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionTraitDef.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java @@ -25,20 +25,20 @@ import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog /** * */ -public class IgniteDistributionTraitDef extends RelTraitDef<IgniteDistribution> { +public class DistributionTraitDef extends RelTraitDef<DistributionTrait> { /** */ - public static final IgniteDistributionTraitDef INSTANCE = new IgniteDistributionTraitDef(); + public static final DistributionTraitDef INSTANCE = new DistributionTraitDef(); - @Override public Class<IgniteDistribution> getTraitClass() { - return IgniteDistribution.class; + @Override public Class<DistributionTrait> getTraitClass() { + return DistributionTrait.class; } @Override public String getSimpleName() { return "distr"; } - @Override public RelNode convert(RelOptPlanner planner, RelNode rel, IgniteDistribution targetDist, boolean allowInfiniteCostConverters) { - IgniteDistribution srcDist = rel.getTraitSet().getTrait(INSTANCE); + @Override public RelNode convert(RelOptPlanner planner, RelNode rel, DistributionTrait targetDist, boolean allowInfiniteCostConverters) { + DistributionTrait srcDist = rel.getTraitSet().getTrait(INSTANCE); // Source and Target have the same trait. if (srcDist.equals(targetDist)) @@ -59,11 +59,11 @@ public class IgniteDistributionTraitDef extends RelTraitDef<IgniteDistribution> } } - @Override public boolean canConvert(RelOptPlanner planner, IgniteDistribution fromTrait, IgniteDistribution toTrait) { + @Override public boolean canConvert(RelOptPlanner planner, DistributionTrait fromTrait, DistributionTrait toTrait) { return true; } - @Override public IgniteDistribution getDefault() { - return IgniteDistribution.ANY; + @Override public DistributionTrait getDefault() { + return DistributionTrait.ANY; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java similarity index 66% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionImpl.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java index fdc43a1..5b7b748 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java @@ -23,20 +23,13 @@ import org.apache.calcite.util.ImmutableIntList; /** * */ -public class IgniteDistributionImpl implements IgniteDistribution { +public class DistributionTraitImpl implements DistributionTrait { private final DistributionType type; private final ImmutableIntList keys; - private ImmutableIntList sources; - - public IgniteDistributionImpl(DistributionType type, ImmutableIntList keys) { - this(type, keys, ImmutableIntList.of()); - } - - public IgniteDistributionImpl(DistributionType type, ImmutableIntList keys, ImmutableIntList sources) { + public DistributionTraitImpl(DistributionType type, ImmutableIntList keys) { this.type = type; this.keys = keys; - this.sources = sources; } @Override public DistributionType type() { @@ -47,32 +40,26 @@ public class IgniteDistributionImpl implements IgniteDistribution { return keys; } - @Override public ImmutableIntList sources() { - return sources; - } - @Override public void register(RelOptPlanner planner) {} @Override public boolean equals(Object o) { if (this == o) return true; - if (o instanceof IgniteDistribution) { - IgniteDistribution that = (IgniteDistribution) o; + if (o instanceof DistributionTrait) { + DistributionTrait that = (DistributionTrait) o; - return type == that.type() - && keys.equals(that.keys()) - && sources.equals(that.sources()); + return type == that.type() && keys.equals(that.keys()); } return false; } @Override public int hashCode() { - return Objects.hash(type, keys, sources); + return Objects.hash(type, keys); } @Override public String toString() { - return type + (type == DistributionType.HASH ? keys.toString() : "") + sources; + return type + (type == DistributionType.HASH ? keys.toString() : ""); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java index 91f2176..7537d63 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java @@ -19,26 +19,31 @@ package org.apache.ignite.internal.processors.query.calcite.trait; import java.util.List; import org.apache.calcite.util.ImmutableIntList; -import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.HASH; -import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.RANDOM; -import static org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.SINGLE; +import static org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH; /** * */ public class IgniteDistributions { /** */ - private static final IgniteDistributionTraitDef traitDef = IgniteDistributionTraitDef.INSTANCE; + private static final DistributionTraitDef traitDef = DistributionTraitDef.INSTANCE; + private static final DistributionTrait SINGLE = traitDef.canonize(new DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE, ImmutableIntList.of())); + private static final DistributionTrait RANDOM = traitDef.canonize(new DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM, ImmutableIntList.of())); + private static final DistributionTrait ANY = traitDef.canonize(new DistributionTraitImpl(DistributionTrait.DistributionType.ANY, ImmutableIntList.of())); - public static IgniteDistribution random(List<Integer> sources) { - return traitDef.canonize(new IgniteDistributionImpl(RANDOM, ImmutableIntList.of(), ImmutableIntList.copyOf(sources))); + public static DistributionTrait any() { + return ANY; } - public static IgniteDistribution hash(List<Integer> keys, List<Integer> sources) { - return traitDef.canonize(new IgniteDistributionImpl(HASH, ImmutableIntList.copyOf(keys), ImmutableIntList.copyOf(sources))); + public static DistributionTrait random() { + return RANDOM; } - public static IgniteDistribution single(List<Integer> sources) { - return traitDef.canonize(new IgniteDistributionImpl(SINGLE, ImmutableIntList.of(), ImmutableIntList.copyOf(sources))); + public static DistributionTrait single() { + return SINGLE; + } + + public static DistributionTrait hash(List<Integer> keys) { + return traitDef.canonize(new DistributionTraitImpl(HASH, ImmutableIntList.copyOf(keys))); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index 0fc14b0..b638815 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -20,9 +20,10 @@ package org.apache.ignite.internal.processors.query.calcite.util; import java.util.Collections; import java.util.IdentityHashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; -import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; @@ -34,12 +35,12 @@ import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryContext; -import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.schema.RowType; +import org.jetbrains.annotations.Nullable; /** * @@ -51,19 +52,33 @@ public final class Commons { return ctx == null ? Contexts.empty() : Contexts.of(ctx.unwrap(Object[].class)); } + public static <T> @Nullable T contextParameter(Context ctx, Class<T> paramType, Supplier<T> paramSrc) { + T param = ctx.unwrap(paramType); + + if (param != null) + return param; + + return paramSrc.get(); + } + /** */ - public static Function<RelDataTypeFactory, RelDataType> rowTypeFunction(GridQueryTypeDescriptor desc) { - return (f) -> { - RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); + public static RowType rowType(GridQueryTypeDescriptor desc) { + RowType.Builder b = RowType.builder(); - builder.add(QueryUtils.KEY_FIELD_NAME, f.createJavaType(desc.keyClass())); - builder.add(QueryUtils.VAL_FIELD_NAME, f.createJavaType(desc.valueClass())); + Map<String, Class<?>> fields = desc.fields(); - for (Map.Entry<String, Class<?>> prop : desc.fields().entrySet()) { - builder.add(prop.getKey(), f.createJavaType(prop.getValue())); - } - return builder.build(); - }; + b.key(desc.keyClass()).val(desc.valueClass()); + + for (Map.Entry<String, Class<?>> entry : fields.entrySet()) { + GridQueryProperty prop = desc.property(entry.getKey()); + + if (prop.key()) + b.keyField(prop.name(), prop.type(), Objects.equals(desc.affinityKey(), prop.name())); + else + b.field(prop.name(), prop.type()); + } + + return b.build(); } public static RelOptRuleOperand any(Class<? extends RelNode> first, RelTrait trait){ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java index 7bb2572..b6a6703 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java @@ -19,13 +19,15 @@ package org.apache.ignite.internal.processors.query.calcite.util; import java.lang.reflect.Method; import org.apache.calcite.linq4j.tree.Types; -import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.Distribution; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution; +import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TraitDistribution; /** * */ public enum IgniteMethod { - DISTRIBUTION(Distribution.class, "distribution"); + DISTRIBUTION(TraitDistribution.class, "distribution"), + TASK_DISTRIBUTION(TaskDistribution.class, "distribution"); private final Method method; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java index 7d22d51..c8d8a16 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java @@ -89,7 +89,7 @@ public class ScanIterator<T> extends GridCloseableIteratorAdapter<T> { IgniteCacheOffheapManager.CacheDataStore ds = part.dataStore(); - cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); + cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId, false); } else break; } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 25298e4..028bb9c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -25,11 +25,10 @@ import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; import org.apache.ignite.internal.processors.query.calcite.prepare.Query; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; @@ -37,7 +36,8 @@ import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase; import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; -import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef; +import org.apache.ignite.internal.processors.query.calcite.schema.RowType; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.WithSystemProperty; @@ -45,9 +45,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME; -import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME; - /** * */ @@ -55,6 +52,7 @@ import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_N public class CalciteQueryProcessorTest extends GridCommonAbstractTest { private static CalciteQueryProcessor proc; + private static SchemaPlus schema; @BeforeClass public static void setupClass() { @@ -65,60 +63,38 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { IgniteSchema publicSchema = new IgniteSchema("PUBLIC"); - publicSchema.addTable("Developer", new IgniteTable("Developer", "Developer", (f) -> { - RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); - - builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add("id", f.createJavaType(Integer.class)); - builder.add("name", f.createJavaType(String.class)); - builder.add("projectId", f.createJavaType(Integer.class)); - builder.add("cityId", f.createJavaType(Integer.class)); - - return builder.build(); - }, null)); - - publicSchema.addTable("Project", new IgniteTable("Project", "Project", (f) -> { - RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); - - builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add("id", f.createJavaType(Integer.class)); - builder.add("name", f.createJavaType(String.class)); - builder.add("ver", f.createJavaType(Integer.class)); - - return builder.build(); - }, null)); - - publicSchema.addTable("Country", new IgniteTable("Country", "Country", (f) -> { - RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); - - builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add("id", f.createJavaType(Integer.class)); - builder.add("name", f.createJavaType(String.class)); - builder.add("countryCode", f.createJavaType(Integer.class)); - - return builder.build(); - }, null)); - - publicSchema.addTable("City", new IgniteTable("City", "City", (f) -> { - RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(f); - - builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class)); - builder.add("id", f.createJavaType(Integer.class)); - builder.add("name", f.createJavaType(String.class)); - builder.add("countryId", f.createJavaType(Integer.class)); - - return builder.build(); - }, null)); - - SchemaPlus schema = Frameworks.createRootSchema(false); + publicSchema.addTable("Developer", new IgniteTable("Developer", "Developer", + RowType.builder() + .keyField("id", Integer.class, true) + .field("name", String.class) + .field("projectId", Integer.class) + .field("cityId", Integer.class) + .build())); + + publicSchema.addTable("Project", new IgniteTable("Project", "Project", + RowType.builder() + .keyField("id", Integer.class, true) + .field("name", String.class) + .field("ver", Integer.class) + .build())); + + publicSchema.addTable("Country", new IgniteTable("Country", "Country", + RowType.builder() + .keyField("id", Integer.class, true) + .field("name", String.class) + .field("countryCode", Integer.class) + .build())); + + publicSchema.addTable("City", new IgniteTable("City", "City", + RowType.builder() + .keyField("id", Integer.class, true) + .field("name", String.class) + .field("countryId", Integer.class) + .build())); + + schema = Frameworks.createRootSchema(false); schema.add("PUBLIC", publicSchema); - - proc.schemaHolder().schema(schema); } @Test @@ -130,12 +106,12 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { "ON d.projectId = p.id0 " + "WHERE (d.projectId + 1) > ?"; - Context ctx = proc.context(Contexts.empty(), sql, new Object[]{2}); + Context ctx = proc.context(Contexts.of(schema, AffinityTopologyVersion.NONE), sql, new Object[]{2}); assertNotNull(ctx); RelTraitDef[] traitDefs = { - IgniteDistributionTraitDef.INSTANCE, + DistributionTraitDef.INSTANCE, ConventionTraitDef.INSTANCE }; @@ -164,7 +140,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { RelTraitSet desired = rel.getCluster().traitSet() .replace(IgniteRel.LOGICAL_CONVENTION) - .replace(IgniteDistributions.single(ImmutableIntList.of())) + .replace(IgniteDistributions.single()) .simplify(); rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
