This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 9179fd00cc2016f5fa1d219fa5a462c36b00c319
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Oct 25 17:52:53 2019 +0300

    pending
---
 .../query/calcite/CalciteQueryProcessor.java       |  37 ++---
 .../calcite/exchange/DistributionRegistry.java     |  28 ++++
 .../query/calcite/exchange/Receiver.java           |  48 +++++++
 .../processors/query/calcite/exchange/Sender.java  |  39 ++++++
 .../calcite/metadata/IgniteMdDistribution.java     |  44 +++---
 .../metadata/IgniteMdSourceDistribution.java       | 155 +++++++++++++++++++++
 .../query/calcite/metadata/IgniteMetadata.java     |  30 ++--
 .../calcite/rel/logical/IgniteLogicalExchange.java |  12 +-
 .../rel/logical/IgniteLogicalTableScan.java        |   9 +-
 .../query/calcite/rule/logical/IgniteJoinRule.java |   9 +-
 .../schema/CalciteSchemaChangeListener.java        |  64 ---------
 .../query/calcite/schema/CalciteSchemaHolder.java  |  47 +++++--
 .../query/calcite/schema/IgniteSchema.java         |   2 +-
 .../query/calcite/schema/IgniteTable.java          |  48 ++++---
 .../processors/query/calcite/schema/RowType.java   | 140 +++++++++++++++++++
 .../query/calcite/schema/SchemaProvider.java       |  27 ----
 .../query/calcite/schema/TableDescriptor.java      |  27 ----
 .../query/calcite/splitter/SourceDistribution.java |  32 +++++
 .../query/calcite/splitter/SplitTask.java          |  32 +++++
 .../query/calcite/splitter/TaskSplitter.java       |  26 ++++
 ...iteDistribution.java => DistributionTrait.java} |  21 +--
 ...tionTraitDef.java => DistributionTraitDef.java} |  18 +--
 ...ibutionImpl.java => DistributionTraitImpl.java} |  27 +---
 .../query/calcite/trait/IgniteDistributions.java   |  25 ++--
 .../processors/query/calcite/util/Commons.java     |  43 ++++--
 .../query/calcite/util/IgniteMethod.java           |   6 +-
 .../query/calcite/util/ScanIterator.java           |   2 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  98 +++++--------
 28 files changed, 751 insertions(+), 345 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 737f19b..a024027 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -33,6 +33,7 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.QueryEngine;
@@ -40,7 +41,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.DistributedEx
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution;
-import 
org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaChangeListener;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import 
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
@@ -48,6 +48,8 @@ import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.query.calcite.util.Commons.contextParameter;
+
 /**
  *
  */
@@ -62,7 +64,7 @@ public class CalciteQueryProcessor implements QueryEngine {
     private IgniteLogger log;
 
     /** */
-    private GridKernalContext ctx;
+    private GridKernalContext kernalContext;
 
     public CalciteQueryProcessor() {
         config = Frameworks.newConfigBuilder()
@@ -94,12 +96,12 @@ public class CalciteQueryProcessor implements QueryEngine {
 
     /** {@inheritDoc} */
     @Override public void start(@NotNull GridKernalContext ctx) {
-        this.ctx = ctx;
+        kernalContext = ctx;
 
         GridInternalSubscriptionProcessor prc = 
ctx.internalSubscriptionProcessor();
 
         if (prc != null) // Stubbed context doesn't have such processor
-            prc.registerSchemaChangeListener(new 
CalciteSchemaChangeListener(schemaHolder));
+            prc.registerSchemaChangeListener(schemaHolder);
     }
 
     /** {@inheritDoc} */
@@ -122,7 +124,7 @@ public class CalciteQueryProcessor implements QueryEngine {
     }
 
     public GridKernalContext context() {
-        return ctx;
+        return kernalContext;
     }
 
     /** */
@@ -136,10 +138,6 @@ public class CalciteQueryProcessor implements QueryEngine {
         return new IgnitePlanner(cfg);
     }
 
-    private QueryExecution prepare(Context ctx) {
-        return new DistributedExecution(ctx);
-    }
-
     /**
      * @param ctx External context.
      * @param query Query string.
@@ -147,16 +145,19 @@ public class CalciteQueryProcessor implements QueryEngine 
{
      * @return Query execution context.
      */
     Context context(@NotNull Context ctx, String query, Object[] params) { // 
Package private visibility for tests.
-        return Contexts.chain(
-            config.getContext(),
-            Contexts.of(schemaHolder.schema(), new Query(query, params)),
-            ctx);
+        return Contexts.chain(ctx,
+            Contexts.of(
+                new Query(query, params),
+                contextParameter(ctx, SchemaPlus.class, schemaHolder::schema),
+                contextParameter(ctx, AffinityTopologyVersion.class, 
this::readyAffinityVersion)),
+            config.getContext());
     }
 
-    /**
-     * @return Schema provider.
-     */
-    CalciteSchemaHolder schemaHolder() { // Package private visibility for 
tests.
-        return schemaHolder;
+    private QueryExecution prepare(Context ctx) {
+        return new DistributedExecution(ctx);
+    }
+
+    private AffinityTopologyVersion readyAffinityVersion() {
+        return 
kernalContext.cache().context().exchange().readyAffinityVersion();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
new file mode 100644
index 0000000..a2b0942
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/DistributionRegistry.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+
+/**
+ *
+ */
+public interface DistributionRegistry {
+    Map<ClusterNode, int[]> partitionMapping(int cacheId, 
AffinityTopologyVersion topVer);
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
new file mode 100644
index 0000000..8d2ee1c
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Receiver.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+
+/**
+ *
+ */
+public class Receiver extends SingleRel implements IgniteRel {
+    /**
+     * @param cluster Cluster this relational expression belongs to
+     * @param traits Trait set.
+     * @param sender Corresponding sender.
+     */
+    protected Receiver(RelOptCluster cluster, RelTraitSet traits, Sender 
sender) {
+        super(cluster, traits, sender);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Sender getInput() {
+        return (Sender) input;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
new file mode 100644
index 0000000..5049250
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Sender.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exchange;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+
+/**
+ *
+ */
+public class Sender extends SingleRel implements IgniteRel {
+    /**
+     * Creates a <code>SingleRel</code>.
+     *
+     * @param cluster Cluster this relational expression belongs to
+     * @param traits Trait set.
+     * @param input Input relational expression
+     */
+    protected Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) 
{
+        super(cluster, traits, input);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index a35c342..ded1a8d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -37,52 +37,52 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexSlot;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.ImmutableIntList;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.HASH;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH;
 
 /**
  *
  */
-public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Distribution> {
+public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.TraitDistribution> {
     public static final RelMetadataProvider SOURCE =
         
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.DISTRIBUTION.method(),
 new IgniteMdDistribution());
 
-    @Override public MetadataDef<IgniteMetadata.Distribution> getDef() {
-        return IgniteMetadata.Distribution.DEF;
+    @Override public MetadataDef<IgniteMetadata.TraitDistribution> getDef() {
+        return IgniteMetadata.TraitDistribution.DEF;
     }
 
-    public IgniteDistribution distribution(RelNode rel, RelMetadataQuery mq) {
-        return IgniteDistributionTraitDef.INSTANCE.getDefault();
+    public DistributionTrait distribution(RelNode rel, RelMetadataQuery mq) {
+        return DistributionTraitDef.INSTANCE.getDefault();
     }
 
-    public IgniteDistribution distribution(Filter filter, RelMetadataQuery mq) 
{
+    public DistributionTrait distribution(Filter filter, RelMetadataQuery mq) {
         return filter(mq, filter.getInput(), filter.getCondition());
     }
 
-    public IgniteDistribution distribution(Project project, RelMetadataQuery 
mq) {
+    public DistributionTrait distribution(Project project, RelMetadataQuery 
mq) {
         return project(mq, project.getInput(), project.getProjects());
     }
 
-    public IgniteDistribution distribution(Join join, RelMetadataQuery mq) {
+    public DistributionTrait distribution(Join join, RelMetadataQuery mq) {
         return join(mq, join.getLeft(), join.getRight(), join.getCondition());
     }
 
-    public IgniteDistribution distribution(RelSubset rel, RelMetadataQuery mq) 
{
-        return rel.getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE);
+    public DistributionTrait distribution(RelSubset rel, RelMetadataQuery mq) {
+        return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
-    public static IgniteDistribution project(RelMetadataQuery mq, RelNode 
input, List<RexNode> projects) {
-        IgniteDistribution trait = distribution_(input, mq);
+    public static DistributionTrait project(RelMetadataQuery mq, RelNode 
input, List<RexNode> projects) {
+        DistributionTrait trait = distribution_(input, mq);
 
         if (trait.type() == HASH) {
             ImmutableIntList keys = trait.keys();
 
             if (keys.size() > projects.size())
-                return IgniteDistributions.random(trait.sources());
+                return IgniteDistributions.random();
 
             Map<Integer, Integer> m = new HashMap<>(projects.size());
 
@@ -103,26 +103,26 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
                 Integer mapped = m.get(key);
 
                 if (mapped == null)
-                    return IgniteDistributions.random(trait.sources());
+                    return IgniteDistributions.random();
 
                 newKeys.add(mapped);
             }
 
-            return IgniteDistributions.hash(newKeys, trait.sources());
+            return IgniteDistributions.hash(newKeys);
         }
 
         return trait;
     }
 
-    public static IgniteDistribution filter(RelMetadataQuery mq, RelNode 
input, RexNode condition) {
+    public static DistributionTrait filter(RelMetadataQuery mq, RelNode input, 
RexNode condition) {
         return distribution_(input, mq);
     }
 
-    public static IgniteDistribution join(RelMetadataQuery mq, RelNode left, 
RelNode right, RexNode condition) {
+    public static DistributionTrait join(RelMetadataQuery mq, RelNode left, 
RelNode right, RexNode condition) {
         return distribution_(left, mq);
     }
 
-    public static IgniteDistribution distribution_(RelNode rel, 
RelMetadataQuery mq) {
-        return rel.metadata(IgniteMetadata.Distribution.class, 
mq).distribution();
+    public static DistributionTrait distribution_(RelNode rel, 
RelMetadataQuery mq) {
+        return rel.metadata(IgniteMetadata.TraitDistribution.class, 
mq).distribution();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
new file mode 100644
index 0000000..1c80b96
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSourceDistribution.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+import org.apache.ignite.internal.util.GridIntList;
+
+/**
+ *
+ */
+public class IgniteMdSourceDistribution implements 
MetadataHandler<TaskDistribution> {
+    public static final RelMetadataProvider SOURCE =
+        
ReflectiveRelMetadataProvider.reflectiveSource(IgniteMethod.TASK_DISTRIBUTION.method(),
 new IgniteMdSourceDistribution());
+
+    @Override public MetadataDef<TaskDistribution> getDef() {
+        return TaskDistribution.DEF;
+    }
+
+    public SourceDistribution distribution(RelNode rel, RelMetadataQuery mq) {
+        throw new AssertionError();
+    }
+
+    public SourceDistribution distribution(RelSubset rel, RelMetadataQuery mq) 
{
+        throw new AssertionError();
+    }
+
+    public SourceDistribution distribution(SingleRel rel, RelMetadataQuery mq) 
{
+        return distribution_(rel.getInput(), mq);
+    }
+
+    public SourceDistribution distribution(BiRel rel, RelMetadataQuery mq) {
+        return merge(distribution_(rel.getLeft(), mq), 
distribution_(rel.getRight(), mq));
+    }
+
+    public SourceDistribution distribution(Receiver rel, RelMetadataQuery mq) {
+        SourceDistribution res = new SourceDistribution();
+
+        res.remoteInputs.add(rel);
+
+        return res;
+    }
+
+    public SourceDistribution distribution(IgniteLogicalTableScan rel, 
RelMetadataQuery mq) {
+        return rel.tableDistribution();
+    }
+
+    public static SourceDistribution distribution_(RelNode rel, 
RelMetadataQuery mq) {
+        return rel.metadata(TaskDistribution.class, mq).distribution();
+    }
+
+    private static SourceDistribution merge(SourceDistribution left, 
SourceDistribution right) {
+        SourceDistribution res = new SourceDistribution();
+
+        res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
+        res.partitionMapping = merge(left.partitionMapping, 
right.partitionMapping);
+        res.localInputs = merge(left.localInputs, right.localInputs);
+
+        return res;
+    }
+
+    private static <T> List<T> merge(List<T> left, List<T> right) {
+        if (left == null)
+            return right;
+
+        if (right != null)
+            left.addAll(right);
+
+        return left;
+    }
+
+    private static GridIntList merge(GridIntList left, GridIntList right) {
+        if (left == null)
+            return right;
+
+        if (right != null)
+            left.addAll(right);
+
+        return left;
+    }
+
+    private static Map<ClusterNode, int[]> merge(Map<ClusterNode, int[]> left, 
Map<ClusterNode, int[]> right) {
+        if (left == null)
+            return right;
+
+        if (right == null)
+            return left;
+
+        Map<ClusterNode, int[]> res = new HashMap<>(Math.min(left.size(), 
right.size()));
+
+        Set<ClusterNode> keys = new HashSet<>(left.keySet());
+
+        keys.retainAll(right.keySet());
+
+        for (ClusterNode node : keys) {
+            int[] leftParts  = left.get(node);
+            int[] rightParts = right.get(node);
+
+            int[] nodeParts = new int[Math.min(leftParts.length, 
rightParts.length)];
+
+            int i = 0, j = 0, k = 0;
+
+            while (i < leftParts.length && j < rightParts.length) {
+                if (leftParts[i] < rightParts[j])
+                    i++;
+                else if (rightParts[j] < leftParts[i])
+                    j++;
+                else {
+                    nodeParts[k++] = leftParts[i];
+
+                    i++;
+                    j++;
+                }
+            }
+
+            if (k > 0)
+                res.put(node, k < nodeParts.length ? Arrays.copyOf(nodeParts, 
k) : nodeParts);
+        }
+
+        return res;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 1f5e23b..3b5445e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -25,7 +25,8 @@ import org.apache.calcite.rel.metadata.MetadataDef;
 import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
 /**
@@ -35,18 +36,31 @@ public class IgniteMetadata {
     public static final RelMetadataProvider METADATA_PROVIDER =
         ChainedRelMetadataProvider.of(
             ImmutableList.of(
-                DefaultRelMetadataProvider.INSTANCE,
-                IgniteMdDistribution.SOURCE));
+                IgniteMdDistribution.SOURCE,
+                IgniteMdSourceDistribution.SOURCE,
+                DefaultRelMetadataProvider.INSTANCE));
 
-    public interface Distribution extends Metadata {
-        MetadataDef<Distribution> DEF = MetadataDef.of(Distribution.class, 
Distribution.Handler.class, IgniteMethod.DISTRIBUTION.method());
+    public interface TraitDistribution extends Metadata {
+        MetadataDef<TraitDistribution> DEF = 
MetadataDef.of(TraitDistribution.class, TraitDistribution.Handler.class, 
IgniteMethod.DISTRIBUTION.method());
 
         /** Determines how the rows are distributed. */
-        IgniteDistribution distribution();
+        DistributionTrait distribution();
 
         /** Handler API. */
-        interface Handler extends MetadataHandler<Distribution> {
-            IgniteDistribution distribution(RelNode r, RelMetadataQuery mq);
+        interface Handler extends MetadataHandler<TraitDistribution> {
+            DistributionTrait distribution(RelNode r, RelMetadataQuery mq);
+        }
+    }
+
+    public interface TaskDistribution extends Metadata {
+        MetadataDef<TaskDistribution> DEF = 
MetadataDef.of(TaskDistribution.class, TaskDistribution.Handler.class, 
IgniteMethod.TASK_DISTRIBUTION.method());
+
+        /** Determines how the rows are distributed. */
+        SourceDistribution distribution();
+
+        /** Handler API. */
+        interface Handler extends MetadataHandler<TaskDistribution> {
+            SourceDistribution distribution(RelNode r, RelMetadataQuery mq);
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
index 03c4682..ed9bee6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalExchange.java
@@ -26,8 +26,8 @@ import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 
 /**
  *
@@ -51,12 +51,12 @@ public final class IgniteLogicalExchange extends SingleRel 
implements IgniteRel
             Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
     }
 
-    public IgniteDistribution sourceDistribution() {
-        return 
getInput().getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE);
+    public DistributionTrait sourceDistribution() {
+        return 
getInput().getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
-    public IgniteDistribution targetDistribution() {
-        return getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE);
+    public DistributionTrait targetDistribution() {
+        return getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
     @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index e34c8f7..8698573 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -23,10 +23,11 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
 
 public final class IgniteLogicalTableScan extends TableScan implements 
IgniteRel {
-  public IgniteLogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet,
-      RelOptTable table) {
+  public IgniteLogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable table) {
     super(cluster, traitSet, table);
   }
 
@@ -35,4 +36,8 @@ public final class IgniteLogicalTableScan extends TableScan 
implements IgniteRel
 
     return this;
   }
+
+  public SourceDistribution tableDistribution() {
+     return 
getTable().unwrap(IgniteTable.class).tableDistribution(getCluster().getPlanner().getContext());
+  }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
index 7458c97..a5abdea 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/IgniteJoinRule.java
@@ -27,8 +27,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalJoin;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
@@ -45,16 +43,13 @@ public class IgniteJoinRule extends RelOptRule {
     @Override public void onMatch(RelOptRuleCall call) {
         LogicalJoin join = call.rel(0);
 
-        IgniteDistribution leftDist = 
join.getLeft().getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE);
-        IgniteDistribution rightDist = 
join.getRight().getTraitSet().getTrait(IgniteDistributionTraitDef.INSTANCE);
-
         RelTraitSet leftTraits = join.getLeft().getTraitSet()
             .replace(IgniteRel.LOGICAL_CONVENTION)
-            
.replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys, 
leftDist.sources()));
+            
.replace(IgniteDistributions.hash(join.analyzeCondition().leftKeys));
 
         RelTraitSet rightTraits = join.getRight().getTraitSet()
             .replace(IgniteRel.LOGICAL_CONVENTION)
-            
.replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys, 
rightDist.sources()));
+            
.replace(IgniteDistributions.hash(join.analyzeCondition().rightKeys));
 
         RelNode left = convert(join.getLeft(), leftTraits);
         RelNode right = convert(join.getRight(), rightTraits);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java
deleted file mode 100644
index 67cca7c..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaChangeListener.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.schema;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
-import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
-import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
-
-/**
- *
- */
-public class CalciteSchemaChangeListener implements SchemaChangeListener {
-    private final Map<String, IgniteSchema> schemas = new HashMap<>();
-    private final CalciteSchemaHolder schemaHolder;
-
-    public CalciteSchemaChangeListener(CalciteSchemaHolder schemaHolder) {
-        this.schemaHolder = schemaHolder;
-    }
-
-    @Override public synchronized void onSchemaCreate(String schemaName) {
-        schemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
-        rebuild();
-    }
-
-    @Override public synchronized void onSchemaDrop(String schemaName) {
-        schemas.remove(schemaName);
-        rebuild();
-    }
-
-    @Override public synchronized void onSqlTypeCreate(String schemaName, 
GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) {
-        schemas.computeIfAbsent(schemaName, 
IgniteSchema::new).onSqlTypeCreate(typeDescriptor, cacheInfo);
-        rebuild();
-    }
-
-    @Override public synchronized void onSqlTypeDrop(String schemaName, 
GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) {
-        schemas.computeIfAbsent(schemaName, 
IgniteSchema::new).onSqlTypeDrop(typeDescriptor, cacheInfo);
-        rebuild();
-    }
-
-    public void rebuild() {
-        SchemaPlus schema = Frameworks.createRootSchema(false);
-        schemas.forEach(schema::add);
-        schemaHolder.schema(schema);
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
index 3978ae5..fb537e1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
@@ -17,23 +17,52 @@
 
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
 
 /**
  *
  */
-public class CalciteSchemaHolder implements SchemaProvider {
+public class CalciteSchemaHolder implements SchemaChangeListener {
+    private final Map<String, IgniteSchema> schemas = new HashMap<>();
     private volatile SchemaPlus schema;
 
-    @Override public SchemaPlus schema() {
-        return Objects.requireNonNull(schema);
-    }
-
-    /**
-     * @param schema Calcite schema.
-     */
     public void schema(SchemaPlus schema) {
         this.schema = schema;
     }
+
+    public SchemaPlus schema() {
+        return schema;
+    }
+
+    @Override public synchronized void onSchemaCreate(String schemaName) {
+        schemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
+        rebuild();
+    }
+
+    @Override public synchronized void onSchemaDrop(String schemaName) {
+        schemas.remove(schemaName);
+        rebuild();
+    }
+
+    @Override public synchronized void onSqlTypeCreate(String schemaName, 
GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) {
+        schemas.computeIfAbsent(schemaName, 
IgniteSchema::new).onSqlTypeCreate(typeDescriptor, cacheInfo);
+        rebuild();
+    }
+
+    @Override public synchronized void onSqlTypeDrop(String schemaName, 
GridQueryTypeDescriptor typeDescriptor, GridCacheContextInfo cacheInfo) {
+        schemas.computeIfAbsent(schemaName, 
IgniteSchema::new).onSqlTypeDrop(typeDescriptor, cacheInfo);
+        rebuild();
+    }
+
+    private void rebuild() {
+        SchemaPlus schema = Frameworks.createRootSchema(false);
+        schemas.forEach(schema::add);
+        schema(schema);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
index bf95938..81f7265 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
@@ -55,7 +55,7 @@ public class IgniteSchema extends AbstractSchema {
      * @param cacheInfo Cache info.
      */
     public void onSqlTypeCreate(GridQueryTypeDescriptor typeDesc, 
GridCacheContextInfo cacheInfo) {
-        IgniteTable table = new IgniteTable(typeDesc.tableName(), 
cacheInfo.name(), Commons.rowTypeFunction(typeDesc), null);
+        IgniteTable table = new IgniteTable(typeDesc.tableName(), 
cacheInfo.name(), Commons.rowType(typeDesc));
 
         addTable(table.tableName(), table);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index ef1606f..3cbda12 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -17,39 +17,35 @@
 
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
-import java.util.function.Function;
+import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
-import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.query.calcite.exchange.DistributionRegistry;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef;
+import 
org.apache.ignite.internal.processors.query.calcite.splitter.SourceDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /** */
 public class IgniteTable extends AbstractTable implements TranslatableTable {
     private final String tableName;
     private final String cacheName;
-    private final Function<RelDataTypeFactory, RelDataType> rowType;
-    private final Statistic statistic;
+    private final RowType rowType;
 
-
-    public IgniteTable(String tableName, String cacheName,
-        Function<RelDataTypeFactory, RelDataType> rowType, @Nullable Statistic 
statistic) {
+    public IgniteTable(String tableName, String cacheName, RowType rowType) {
         this.tableName = tableName;
         this.cacheName = cacheName;
         this.rowType = rowType;
-
-        this.statistic = statistic == null ? Statistics.UNKNOWN : statistic;
     }
 
     /**
@@ -67,21 +63,29 @@ public class IgniteTable extends AbstractTable implements 
TranslatableTable {
     }
 
     /** {@inheritDoc} */
-    @Override public Statistic getStatistic() {
-        return statistic;
-    }
-
-    /** {@inheritDoc} */
     @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-        return rowType.apply(typeFactory);
+        return rowType.asRelDataType(typeFactory);
     }
 
     /** {@inheritDoc} */
     @Override public RelNode toRel(RelOptTable.ToRelContext context, 
RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
-        RelTraitSet traitSet = cluster.traitSet()
-                .replace(IgniteRel.LOGICAL_CONVENTION)
-                .replaceIf(IgniteDistributionTraitDef.INSTANCE, () -> 
IgniteDistributions.hash(ImmutableIntList.of(0), ImmutableIntList.of()));
+        RelTraitSet traitSet = 
cluster.traitSet().replace(IgniteRel.LOGICAL_CONVENTION)
+                .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteDistributions.hash(rowType.distributionKeys()));
         return new IgniteLogicalTableScan(cluster, traitSet, relOptTable);
     }
+
+    public SourceDistribution tableDistribution(Context context) {
+        SourceDistribution res = new SourceDistribution();
+
+        res.localInputs = new GridIntList();
+        res.localInputs.add(CU.cacheId(cacheName));
+
+        DistributionRegistry registry = 
context.unwrap(DistributionRegistry.class);
+        AffinityTopologyVersion topVer = 
context.unwrap(AffinityTopologyVersion.class);
+
+        res.partitionMapping = 
registry.partitionMapping(CU.cacheId(cacheName), topVer);
+
+        return res;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java
new file mode 100644
index 0000000..0da2c70
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/RowType.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.schema;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.ImmutableIntList;
+
+/**
+ *
+ */
+public class RowType {
+    private final String[] fields;
+    private final Class[] types;
+    private final BitSet keyFields;
+    private final int affinityKey;
+
+    public RowType(String[] fields, Class[] types, BitSet keyFields, int 
affinityKey) {
+        this.fields = fields;
+        this.types = types;
+        this.keyFields = keyFields;
+        this.affinityKey = affinityKey;
+    }
+
+    public RelDataType asRelDataType(RelDataTypeFactory factory) {
+        RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(factory);
+
+        int len = fields.length;
+
+        for (int i = 0; i < len; i++)
+            builder.add(fields[i], factory.createJavaType(types[i]));
+
+        return builder.build();
+    }
+
+    public List<Integer> distributionKeys() {
+        return ImmutableIntList.of(affinityKey);
+    }
+
+    public boolean isKeyField(int idx) {
+        return keyFields.get(idx);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private int affinityKey;
+        private final LinkedHashSet<String> fields;
+        private final BitSet keyFields;
+        private final ArrayList<Class> types;
+
+        private Builder() {
+            fields = new LinkedHashSet<>();
+            types = new ArrayList<>();
+            keyFields = new BitSet();
+
+            fields.add("_key"); types.add(Object.class);
+            fields.add("_val"); types.add(Object.class);
+        }
+
+        public Builder key(Class type) {
+            if (types.get(0) != Object.class && types.get(0) != type)
+                throw new IllegalStateException("Key type is already set.");
+
+            types.set(0, type);
+
+            return this;
+        }
+
+        public Builder val(Class type) {
+            if (types.get(1) != Object.class && types.get(1) != type)
+                throw new IllegalStateException("Value type is already set.");
+
+            types.set(1, type);
+
+            return this;
+        }
+
+        public Builder field(String name, Class type) {
+            if (!fields.add(name))
+                throw new IllegalStateException("Field name must be unique.");
+
+            types.add(type);
+
+            return this;
+        }
+
+        public Builder keyField(String name, Class type) {
+            if (!fields.add(name))
+                throw new IllegalStateException("Field name must be unique.");
+
+            types.add(type);
+
+            keyFields.set(types.size() - 1);
+
+            return this;
+        }
+
+        public Builder keyField(String name, Class type, boolean affinityKey) {
+            if (affinityKey && this.affinityKey > 0)
+                throw new IllegalStateException("Affinity key field must be 
unique.");
+
+            if (!fields.add(name))
+                throw new IllegalStateException("Field name must be unique.");
+
+            types.add(type);
+
+            keyFields.set(types.size() - 1);
+
+            if (affinityKey)
+                this.affinityKey = types.size() - 1;
+
+            return this;
+        }
+
+        public RowType build() {
+            return new RowType(fields.toArray(new String[0]), 
types.toArray(new Class[0]), keyFields, affinityKey);
+        }
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java
deleted file mode 100644
index 5fdd311..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.schema;
-
-import org.apache.calcite.schema.SchemaPlus;
-
-/**
- *
- */
-public interface SchemaProvider {
-    SchemaPlus schema();
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
deleted file mode 100644
index 8c6cc61..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.schema;
-
-/**
- *
- */
-public interface TableDescriptor {
-    public boolean partitioned();
-
-    public boolean replicated();
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
new file mode 100644
index 0000000..eded34e
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SourceDistribution.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.splitter;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Receiver;
+import org.apache.ignite.internal.util.GridIntList;
+
+/**
+ *
+ */
+public class SourceDistribution {
+    public Map<ClusterNode, int[]> partitionMapping; // partition filter for 
unstable topology
+    public List<Receiver> remoteInputs; // remote inputs to notify particular 
senders about final task distribution
+    public GridIntList localInputs; // involved caches, used for partitions 
reservation
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
new file mode 100644
index 0000000..f405939
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/SplitTask.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.splitter;
+
+import org.apache.calcite.rel.RelNode;
+
+/**
+ *
+ */
+public class SplitTask {
+    public final SourceDistribution distribution;
+    public final RelNode root;
+
+    public SplitTask(RelNode root, SourceDistribution distribution) {
+        this.distribution = distribution;
+        this.root = root;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
new file mode 100644
index 0000000..4ca6716
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/TaskSplitter.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.splitter;
+
+import org.apache.calcite.rel.RelShuttleImpl;
+
+/**
+ *
+ */
+public class TaskSplitter extends RelShuttleImpl {
+
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
similarity index 75%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index f80fb2f..3dabee1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -19,13 +19,12 @@ package 
org.apache.ignite.internal.processors.query.calcite.trait;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Util;
 
 /**
  *
  */
-public interface IgniteDistribution extends RelTrait {
-    public enum DistributionType {
+public interface DistributionTrait extends RelTrait {
+    enum DistributionType {
         HASH("hash"),
         RANDOM("random"),
         BROADCAST("broadcast"),
@@ -46,25 +45,22 @@ public interface IgniteDistribution extends RelTrait {
         }
     }
 
-    IgniteDistribution ANY = new IgniteDistributionImpl(DistributionType.ANY, 
ImmutableIntList.of());
+    DistributionTrait ANY = IgniteDistributions.single();
 
     DistributionType type();
 
     @Override default RelTraitDef getTraitDef() {
-        return IgniteDistributionTraitDef.INSTANCE;
+        return DistributionTraitDef.INSTANCE;
     }
 
     @Override default boolean satisfies(RelTrait trait) {
         if (trait == this)
             return true;
 
-        if (!(trait instanceof IgniteDistribution))
+        if (!(trait instanceof DistributionTrait))
             return false;
 
-        IgniteDistribution other = (IgniteDistribution) trait;
-
-        if (!Util.startsWith(other.sources(), sources()))
-            return false;
+        DistributionTrait other = (DistributionTrait) trait;
 
         if (other.type() == DistributionType.ANY)
             return true;
@@ -79,9 +75,4 @@ public interface IgniteDistribution extends RelTrait {
      * @return Hash distribution columns ordinals or empty list otherwise.
      */
     ImmutableIntList keys();
-
-    /**
-     * @return Sources (indexes of sorted by node order nodes list for 
particular topology).
-     */
-    ImmutableIntList sources();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionTraitDef.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
similarity index 73%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionTraitDef.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index 85e1999..facc258 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionTraitDef.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -25,20 +25,20 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog
 /**
  *
  */
-public class IgniteDistributionTraitDef extends 
RelTraitDef<IgniteDistribution> {
+public class DistributionTraitDef extends RelTraitDef<DistributionTrait> {
     /** */
-    public static final IgniteDistributionTraitDef INSTANCE = new 
IgniteDistributionTraitDef();
+    public static final DistributionTraitDef INSTANCE = new 
DistributionTraitDef();
 
-    @Override public Class<IgniteDistribution> getTraitClass() {
-        return IgniteDistribution.class;
+    @Override public Class<DistributionTrait> getTraitClass() {
+        return DistributionTrait.class;
     }
 
     @Override public String getSimpleName() {
         return "distr";
     }
 
-    @Override public RelNode convert(RelOptPlanner planner, RelNode rel, 
IgniteDistribution targetDist, boolean allowInfiniteCostConverters) {
-        IgniteDistribution srcDist = rel.getTraitSet().getTrait(INSTANCE);
+    @Override public RelNode convert(RelOptPlanner planner, RelNode rel, 
DistributionTrait targetDist, boolean allowInfiniteCostConverters) {
+        DistributionTrait srcDist = rel.getTraitSet().getTrait(INSTANCE);
 
         // Source and Target have the same trait.
         if (srcDist.equals(targetDist))
@@ -59,11 +59,11 @@ public class IgniteDistributionTraitDef extends 
RelTraitDef<IgniteDistribution>
         }
     }
 
-    @Override public boolean canConvert(RelOptPlanner planner, 
IgniteDistribution fromTrait, IgniteDistribution toTrait) {
+    @Override public boolean canConvert(RelOptPlanner planner, 
DistributionTrait fromTrait, DistributionTrait toTrait) {
         return true;
     }
 
-    @Override public IgniteDistribution getDefault() {
-        return IgniteDistribution.ANY;
+    @Override public DistributionTrait getDefault() {
+        return DistributionTrait.ANY;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
similarity index 66%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionImpl.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
index fdc43a1..5b7b748 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributionImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitImpl.java
@@ -23,20 +23,13 @@ import org.apache.calcite.util.ImmutableIntList;
 /**
  *
  */
-public class IgniteDistributionImpl implements IgniteDistribution {
+public class DistributionTraitImpl implements DistributionTrait {
     private final DistributionType type;
     private final ImmutableIntList keys;
 
-    private ImmutableIntList sources;
-
-    public IgniteDistributionImpl(DistributionType type, ImmutableIntList 
keys) {
-        this(type, keys, ImmutableIntList.of());
-    }
-
-    public IgniteDistributionImpl(DistributionType type, ImmutableIntList 
keys, ImmutableIntList sources) {
+    public DistributionTraitImpl(DistributionType type, ImmutableIntList keys) 
{
         this.type = type;
         this.keys = keys;
-        this.sources = sources;
     }
 
     @Override public DistributionType type() {
@@ -47,32 +40,26 @@ public class IgniteDistributionImpl implements 
IgniteDistribution {
         return keys;
     }
 
-    @Override public ImmutableIntList sources() {
-        return sources;
-    }
-
     @Override public void register(RelOptPlanner planner) {}
 
     @Override public boolean equals(Object o) {
         if (this == o)
             return true;
 
-        if (o instanceof IgniteDistribution) {
-            IgniteDistribution that = (IgniteDistribution) o;
+        if (o instanceof DistributionTrait) {
+            DistributionTrait that = (DistributionTrait) o;
 
-            return type == that.type()
-                && keys.equals(that.keys())
-                && sources.equals(that.sources());
+            return type == that.type() && keys.equals(that.keys());
         }
 
         return false;
     }
 
     @Override public int hashCode() {
-        return Objects.hash(type, keys, sources);
+        return Objects.hash(type, keys);
     }
 
     @Override public String toString() {
-        return type + (type == DistributionType.HASH ? keys.toString()  : "") 
+ sources;
+        return type + (type == DistributionType.HASH ? keys.toString()  : "");
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 91f2176..7537d63 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -19,26 +19,31 @@ package 
org.apache.ignite.internal.processors.query.calcite.trait;
 import java.util.List;
 import org.apache.calcite.util.ImmutableIntList;
 
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.HASH;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.RANDOM;
-import static 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution.DistributionType.SINGLE;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait.DistributionType.HASH;
 
 /**
  *
  */
 public class IgniteDistributions {
     /** */
-    private static final IgniteDistributionTraitDef traitDef = 
IgniteDistributionTraitDef.INSTANCE;
+    private static final DistributionTraitDef traitDef = 
DistributionTraitDef.INSTANCE;
+    private static final DistributionTrait SINGLE = traitDef.canonize(new 
DistributionTraitImpl(DistributionTrait.DistributionType.SINGLE, 
ImmutableIntList.of()));
+    private static final DistributionTrait RANDOM = traitDef.canonize(new 
DistributionTraitImpl(DistributionTrait.DistributionType.RANDOM, 
ImmutableIntList.of()));
+    private static final DistributionTrait ANY    = traitDef.canonize(new 
DistributionTraitImpl(DistributionTrait.DistributionType.ANY, 
ImmutableIntList.of()));
 
-    public static IgniteDistribution random(List<Integer> sources) {
-        return traitDef.canonize(new IgniteDistributionImpl(RANDOM, 
ImmutableIntList.of(), ImmutableIntList.copyOf(sources)));
+    public static DistributionTrait any() {
+        return ANY;
     }
 
-    public static IgniteDistribution hash(List<Integer> keys, List<Integer> 
sources) {
-        return traitDef.canonize(new IgniteDistributionImpl(HASH, 
ImmutableIntList.copyOf(keys), ImmutableIntList.copyOf(sources)));
+    public static DistributionTrait random() {
+        return RANDOM;
     }
 
-    public static IgniteDistribution single(List<Integer> sources) {
-        return traitDef.canonize(new IgniteDistributionImpl(SINGLE, 
ImmutableIntList.of(), ImmutableIntList.copyOf(sources)));
+    public static DistributionTrait single() {
+        return SINGLE;
+    }
+
+    public static DistributionTrait hash(List<Integer> keys) {
+        return traitDef.canonize(new DistributionTraitImpl(HASH, 
ImmutableIntList.copyOf(keys)));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 0fc14b0..b638815 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -20,9 +20,10 @@ package 
org.apache.ignite.internal.processors.query.calcite.util;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
-import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
@@ -34,12 +35,12 @@ import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryContext;
-import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -51,19 +52,33 @@ public final class Commons {
         return ctx == null ? Contexts.empty() : 
Contexts.of(ctx.unwrap(Object[].class));
     }
 
+    public static <T> @Nullable T contextParameter(Context ctx, Class<T> 
paramType, Supplier<T> paramSrc) {
+        T param = ctx.unwrap(paramType);
+
+        if (param != null)
+            return param;
+
+        return paramSrc.get();
+    }
+
     /** */
-    public static Function<RelDataTypeFactory, RelDataType> 
rowTypeFunction(GridQueryTypeDescriptor desc) {
-        return (f) -> {
-            RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(f);
+    public static RowType rowType(GridQueryTypeDescriptor desc) {
+        RowType.Builder b = RowType.builder();
 
-            builder.add(QueryUtils.KEY_FIELD_NAME, 
f.createJavaType(desc.keyClass()));
-            builder.add(QueryUtils.VAL_FIELD_NAME, 
f.createJavaType(desc.valueClass()));
+        Map<String, Class<?>> fields = desc.fields();
 
-            for (Map.Entry<String, Class<?>> prop : desc.fields().entrySet()) {
-                builder.add(prop.getKey(), f.createJavaType(prop.getValue()));
-            }
-            return builder.build();
-        };
+        b.key(desc.keyClass()).val(desc.valueClass());
+
+        for (Map.Entry<String, Class<?>> entry : fields.entrySet()) {
+            GridQueryProperty prop = desc.property(entry.getKey());
+
+            if (prop.key())
+                b.keyField(prop.name(), prop.type(), 
Objects.equals(desc.affinityKey(), prop.name()));
+            else
+                b.field(prop.name(), prop.type());
+        }
+
+        return b.build();
     }
 
     public static RelOptRuleOperand any(Class<? extends RelNode> first, 
RelTrait trait){
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index 7bb2572..b6a6703 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -19,13 +19,15 @@ package 
org.apache.ignite.internal.processors.query.calcite.util;
 
 import java.lang.reflect.Method;
 import org.apache.calcite.linq4j.tree.Types;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.Distribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TaskDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.TraitDistribution;
 
 /**
  *
  */
 public enum IgniteMethod {
-    DISTRIBUTION(Distribution.class, "distribution");
+    DISTRIBUTION(TraitDistribution.class, "distribution"),
+    TASK_DISTRIBUTION(TaskDistribution.class, "distribution");
 
     private final Method method;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java
index 7d22d51..c8d8a16 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java
@@ -89,7 +89,7 @@ public class ScanIterator<T> extends 
GridCloseableIteratorAdapter<T> {
 
                     IgniteCacheOffheapManager.CacheDataStore ds = 
part.dataStore();
 
-                    cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : 
ds.cursor(cacheId);
+                    cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : 
ds.cursor(cacheId, false);
                 } else
                     break;
             }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 25298e4..028bb9c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -25,11 +25,10 @@ import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -37,7 +36,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -45,9 +45,6 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static 
org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
-import static 
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
-
 /**
  *
  */
@@ -55,6 +52,7 @@ import static 
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_N
 public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     private static CalciteQueryProcessor proc;
+    private static SchemaPlus schema;
 
     @BeforeClass
     public static void setupClass() {
@@ -65,60 +63,38 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
         IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
 
-        publicSchema.addTable("Developer", new IgniteTable("Developer", 
"Developer", (f) -> {
-            RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(f);
-
-            builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add("id", f.createJavaType(Integer.class));
-            builder.add("name", f.createJavaType(String.class));
-            builder.add("projectId", f.createJavaType(Integer.class));
-            builder.add("cityId", f.createJavaType(Integer.class));
-
-            return builder.build();
-        }, null));
-
-        publicSchema.addTable("Project", new IgniteTable("Project", "Project", 
(f) -> {
-            RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(f);
-
-            builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add("id", f.createJavaType(Integer.class));
-            builder.add("name", f.createJavaType(String.class));
-            builder.add("ver", f.createJavaType(Integer.class));
-
-            return builder.build();
-        }, null));
-
-        publicSchema.addTable("Country", new IgniteTable("Country", "Country", 
(f) -> {
-            RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(f);
-
-            builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add("id", f.createJavaType(Integer.class));
-            builder.add("name", f.createJavaType(String.class));
-            builder.add("countryCode", f.createJavaType(Integer.class));
-
-            return builder.build();
-        }, null));
-
-        publicSchema.addTable("City", new IgniteTable("City", "City", (f) -> {
-            RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(f);
-
-            builder.add(KEY_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add(VAL_FIELD_NAME, f.createJavaType(Integer.class));
-            builder.add("id", f.createJavaType(Integer.class));
-            builder.add("name", f.createJavaType(String.class));
-            builder.add("countryId", f.createJavaType(Integer.class));
-
-            return builder.build();
-        }, null));
-
-        SchemaPlus schema = Frameworks.createRootSchema(false);
+        publicSchema.addTable("Developer", new IgniteTable("Developer", 
"Developer",
+            RowType.builder()
+                .keyField("id", Integer.class, true)
+                .field("name", String.class)
+                .field("projectId", Integer.class)
+                .field("cityId", Integer.class)
+                .build()));
+
+        publicSchema.addTable("Project", new IgniteTable("Project", "Project",
+            RowType.builder()
+                .keyField("id", Integer.class, true)
+                .field("name", String.class)
+                .field("ver", Integer.class)
+                .build()));
+
+        publicSchema.addTable("Country", new IgniteTable("Country", "Country",
+            RowType.builder()
+                .keyField("id", Integer.class, true)
+                .field("name", String.class)
+                .field("countryCode", Integer.class)
+                .build()));
+
+        publicSchema.addTable("City", new IgniteTable("City", "City",
+            RowType.builder()
+                .keyField("id", Integer.class, true)
+                .field("name", String.class)
+                .field("countryId", Integer.class)
+                .build()));
+
+        schema = Frameworks.createRootSchema(false);
 
         schema.add("PUBLIC", publicSchema);
-
-        proc.schemaHolder().schema(schema);
     }
 
     @Test
@@ -130,12 +106,12 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.empty(), sql, new Object[]{2});
+        Context ctx = proc.context(Contexts.of(schema, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
 
         assertNotNull(ctx);
 
         RelTraitDef[] traitDefs = {
-            IgniteDistributionTraitDef.INSTANCE,
+            DistributionTraitDef.INSTANCE,
             ConventionTraitDef.INSTANCE
         };
 
@@ -164,7 +140,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
             RelTraitSet desired = rel.getCluster().traitSet()
                 .replace(IgniteRel.LOGICAL_CONVENTION)
-                .replace(IgniteDistributions.single(ImmutableIntList.of()))
+                .replace(IgniteDistributions.single())
                 .simplify();
 
             rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, 
rel, desired);

Reply via email to