This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 4a842bf  refactoring
4a842bf is described below

commit 4a842bf4d7559377e55013bc9467fc0f05de0d24
Author: Igor Seliverstov <[email protected]>
AuthorDate: Wed Nov 13 16:09:05 2019 +0300

    refactoring
---
 .../query/calcite/cluster/RegistryImpl.java        |  47 ++++----
 .../query/calcite/metadata/FragmentLocation.java   |  48 +++++++--
 .../calcite/metadata/IgniteMdFragmentLocation.java |  45 +++-----
 .../query/calcite/metadata/LocationRegistry.java   |   6 +-
 .../metadata/{Location.java => NodesMapping.java}  | 119 ++++++++++++---------
 .../query/calcite/metadata/RelMetadataQueryEx.java |  20 ++++
 .../query/calcite/rel/IgniteExchange.java          |   6 ++
 .../processors/query/calcite/rel/IgniteFilter.java |   6 ++
 .../query/calcite/rel/IgniteHashJoin.java          |   6 ++
 .../query/calcite/rel/IgniteProject.java           |   6 ++
 .../processors/query/calcite/rel/IgniteRel.java    |   3 +
 .../query/calcite/rel/IgniteTableScan.java         |  10 +-
 .../processors/query/calcite/rel/Receiver.java     |  15 ++-
 .../processors/query/calcite/rel/Sender.java       |  12 ++-
 .../query/calcite/schema/IgniteTable.java          |  20 ++--
 .../query/calcite/splitter/Fragment.java           |  36 +++----
 .../query/calcite/splitter/QueryPlan.java          |   5 +-
 .../query/calcite/trait/IgniteDistributions.java   |  20 +++-
 .../processors/query/calcite/util/Commons.java     |   4 -
 .../Implementor.java}                              |  14 +--
 .../query/calcite/CalciteQueryProcessorTest.java   |  64 +++++------
 21 files changed, 310 insertions(+), 202 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 4ad7d48..1736da0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -29,14 +29,15 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
@@ -58,22 +59,22 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
         return IgniteDistributions.hash(rowType.distributionKeys(), new 
AffinityFactory(partFun, key));
     }
 
-    @Override public Location single(AffinityTopologyVersion topVer) {
-        return new 
Location(Collections.singletonList(ctx.discovery().localNode()), null, (byte) 
0);
+    @Override public NodesMapping local() {
+        return new 
NodesMapping(Collections.singletonList(ctx.discovery().localNode()), null, 
(byte) 0);
     }
 
-    @Override public Location random(AffinityTopologyVersion topVer) {
-        return new Location(ctx.discovery().discoCache(topVer).serverNodes(), 
null, (byte) 0);
+    @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+        return new 
NodesMapping(ctx.discovery().discoCache(topVer).serverNodes(), null, (byte) 0);
     }
 
-    @Override public Location distributed(int cacheId, AffinityTopologyVersion 
topVer) {
+    @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
         GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
 
         return cctx.isReplicated() ? replicatedLocation(cctx, topVer) : 
partitionedLocation(cctx, topVer);
     }
 
-    private Location partitionedLocation(GridCacheContext cctx, 
AffinityTopologyVersion topVer) {
-        byte flags = Location.HAS_PARTITIONED_CACHES;
+    private NodesMapping partitionedLocation(GridCacheContext cctx, 
AffinityTopologyVersion topVer) {
+        byte flags = NodesMapping.HAS_PARTITIONED_CACHES;
 
         List<List<ClusterNode>> assignments = 
cctx.affinity().assignments(topVer);
 
@@ -86,7 +87,7 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
             assignments = assignments0;
         }
         else if (!cctx.topology().rebalanceFinished(topVer)) {
-            flags |= Location.HAS_MOVING_PARTITIONS;
+            flags |= NodesMapping.HAS_MOVING_PARTITIONS;
 
             List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
 
@@ -104,19 +105,19 @@ public class RegistryImpl implements 
DistributionRegistry, LocationRegistry {
             assignments = assignments0;
         }
 
-        return new Location(null, assignments, flags);
+        return new NodesMapping(null, assignments, flags);
     }
 
-    private Location replicatedLocation(GridCacheContext cctx, 
AffinityTopologyVersion topVer) {
-        byte flags = Location.HAS_REPLICATED_CACHES;
+    private NodesMapping replicatedLocation(GridCacheContext cctx, 
AffinityTopologyVersion topVer) {
+        byte flags = NodesMapping.HAS_REPLICATED_CACHES;
 
         if (cctx.config().getNodeFilter() != null)
-            flags |= Location.PARTIALLY_REPLICATED;
+            flags |= NodesMapping.PARTIALLY_REPLICATED;
 
         List<ClusterNode> nodes = 
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.cacheId());
 
         if (!cctx.topology().rebalanceFinished(topVer)) {
-            flags |= Location.PARTIALLY_REPLICATED;
+            flags |= NodesMapping.PARTIALLY_REPLICATED;
 
             List<ClusterNode> nodes0 = new ArrayList<>(nodes.size());
 
@@ -135,7 +136,7 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
             nodes = nodes0;
         }
 
-        return new Location(nodes, null, flags);
+        return new NodesMapping(nodes, null, flags);
     }
 
     private static class AffinityFactory implements DestinationFunctionFactory 
{
@@ -148,9 +149,17 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
         }
 
         @Override public DestinationFunction create(FragmentLocation 
targetLocation, ImmutableIntList keys) {
-            assert keys.size() == 1 && targetLocation.location != null;
+            assert keys.size() == 1 && targetLocation.mapping() != null && 
!F.isEmpty(targetLocation.mapping().assignments());
 
-            return create(targetLocation.location, partFun, keys.getInt(0));
+            List<List<ClusterNode>> assignments = 
targetLocation.mapping().assignments();
+
+            if (U.assertionsEnabled()) {
+                for (List<ClusterNode> assignment : assignments) {
+                    assert F.isEmpty(assignment) || assignment.size() == 1;
+                }
+            }
+
+            return create(assignments, partFun, keys.getInt(0));
         }
 
         @Override public boolean equals(Object o) {
@@ -166,8 +175,8 @@ public class RegistryImpl implements DistributionRegistry, 
LocationRegistry {
             return key.hashCode();
         }
 
-        private static DestinationFunction create(Location location, 
ToIntFunction<Object> partFun, int affField) {
-            return row -> location.nodes(partFun.applyAsInt(((Object[]) 
row)[affField]));
+        private static DestinationFunction create(List<List<ClusterNode>> 
assignments, ToIntFunction<Object> partFun, int affField) {
+            return row -> assignments.get(partFun.applyAsInt(((Object[]) 
row)[affField]));
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
index 7106e81..26c203e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
@@ -16,17 +16,53 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.util.GridIntList;
 
 /**
  *
  */
 public class FragmentLocation {
-    public Location location;
-    public List<Receiver> remoteInputs;
-    public GridIntList localInputs;
-    public AffinityTopologyVersion topVer;
+    private NodesMapping mapping;
+
+    private final ImmutableList<Receiver> remoteInputs;
+    private final ImmutableIntList localInputs;
+    private final AffinityTopologyVersion topVer;
+
+    public FragmentLocation(ImmutableList<Receiver> remoteInputs, 
AffinityTopologyVersion topVer) {
+        this(null, remoteInputs, null, topVer);
+    }
+
+    public FragmentLocation(NodesMapping mapping, ImmutableIntList 
localInputs, AffinityTopologyVersion topVer) {
+        this(mapping, null, localInputs, topVer);
+    }
+
+    public FragmentLocation(NodesMapping mapping, ImmutableList<Receiver> 
remoteInputs, ImmutableIntList localInputs, AffinityTopologyVersion topVer) {
+        this.mapping = mapping;
+        this.remoteInputs = remoteInputs;
+        this.localInputs = localInputs;
+        this.topVer = topVer;
+    }
+
+    public NodesMapping mapping() {
+        return mapping;
+    }
+
+    public void mapping(NodesMapping mapping) {
+        this.mapping = mapping;
+    }
+
+    public ImmutableList<Receiver> remoteInputs() {
+        return remoteInputs;
+    }
+
+    public ImmutableIntList localInputs() {
+        return localInputs;
+    }
+
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
index 0c20e59..5e6e8a9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
@@ -16,8 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.util.Collections;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.volcano.RelSubset;
@@ -29,15 +28,14 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
-import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -79,9 +77,9 @@ public class IgniteMdFragmentLocation implements 
MetadataHandler<FragmentLocatio
         }
         catch (LocationMappingException e) {
             // a replicated cache is cheaper to redistribute
-            if (!leftLoc.location.hasPartitionedCaches())
+            if (!leftLoc.mapping().hasPartitionedCaches())
                 throw planningException(rel, e, true);
-            else if (!rightLoc.location.hasPartitionedCaches())
+            else if (!rightLoc.mapping().hasPartitionedCaches())
                 throw planningException(rel, e, false);
 
             // both sub-trees have partitioned sources, less cost is better
@@ -104,12 +102,8 @@ public class IgniteMdFragmentLocation implements 
MetadataHandler<FragmentLocatio
     }
 
     public FragmentLocation getLocation(Receiver rel, RelMetadataQuery mq) {
-        FragmentLocation res = new FragmentLocation();
-
-        res.remoteInputs = Collections.singletonList(rel);
-        res.topVer = 
rel.getCluster().getPlanner().getContext().unwrap(AffinityTopologyVersion.class);
-
-        return res;
+        return new FragmentLocation(ImmutableList.of(rel),
+            
rel.getCluster().getPlanner().getContext().unwrap(AffinityTopologyVersion.class));
     }
 
     public FragmentLocation getLocation(IgniteTableScan rel, RelMetadataQuery 
mq) {
@@ -121,17 +115,13 @@ public class IgniteMdFragmentLocation implements 
MetadataHandler<FragmentLocatio
     }
 
     private static FragmentLocation merge(FragmentLocation left, 
FragmentLocation right) throws LocationMappingException {
-        FragmentLocation res = new FragmentLocation();
-
-        res.location = merge(left.location, right.location);
-        res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
-        res.localInputs = merge(left.localInputs, right.localInputs);
-        res.topVer = U.firstNotNull(left.topVer, right.topVer);
-
-        return res;
+        return new FragmentLocation(merge(left.mapping(), right.mapping()),
+            merge(left.remoteInputs(), right.remoteInputs()),
+            merge(left.localInputs(), right.localInputs()),
+            U.firstNotNull(left.topologyVersion(), right.topologyVersion()));
     }
 
-    private static Location merge(Location left, Location right) throws 
LocationMappingException {
+    private static NodesMapping merge(NodesMapping left, NodesMapping right) 
throws LocationMappingException {
         if (left == null)
             return right;
         if (right == null)
@@ -140,22 +130,21 @@ public class IgniteMdFragmentLocation implements 
MetadataHandler<FragmentLocatio
         return left.mergeWith(right);
     }
 
-    private static <T> List<T> merge(List<T> left, List<T> right) {
+    private static <T> ImmutableList<T> merge(ImmutableList<T> left, 
ImmutableList<T> right) {
         if (left == null)
             return right;
         if (right == null)
             return left;
 
-        return Commons.union(left, right);
+        return ImmutableList.<T>builder().addAll(left).addAll(right).build();
     }
 
-    private static GridIntList merge(GridIntList left, GridIntList right) {
+    private static ImmutableIntList merge(ImmutableIntList left, 
ImmutableIntList right) {
         if (left == null)
             return right;
+        if (right == null)
+            return left;
 
-        if (right != null)
-            left.addAll(right);
-
-        return left;
+        return left.appendAll(right);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
index d81e440..bf62302 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
@@ -22,7 +22,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  *
  */
 public interface LocationRegistry {
-    Location single(AffinityTopologyVersion topVer); // returns local node 
with single partition
-    Location random(AffinityTopologyVersion topVer); // returns random 
distribution, partitions count depends on nodes count
-    Location distributed(int cacheId, AffinityTopologyVersion topVer); // 
returns cache distribution
+    NodesMapping local(); // returns local node with single partition
+    NodesMapping random(AffinityTopologyVersion topVer); // returns random 
distribution, partitions count depends on nodes count
+    NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer); // 
returns cache distribution
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
similarity index 74%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index c83d16b..d40af2a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -20,26 +20,28 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
  */
-public class Location {
-    public static byte HAS_MOVING_PARTITIONS = 0x1;
-    public static byte HAS_REPLICATED_CACHES = 0x2;
-    public static byte HAS_PARTITIONED_CACHES = 0x4;
-    public static byte PARTIALLY_REPLICATED = 0x8;
-    public static byte DEDUPLICATED = 0x16;
+public class NodesMapping {
+    public static final byte HAS_MOVING_PARTITIONS = 0x1;
+    public static final byte HAS_REPLICATED_CACHES = 0x2;
+    public static final byte HAS_PARTITIONED_CACHES = 0x4;
+    public static final byte PARTIALLY_REPLICATED = 0x8;
+    public static final byte DEDUPLICATED = 0x16;
 
     private final List<ClusterNode> nodes;
     private final List<List<ClusterNode>> assignments;
     private final byte flags;
 
-    public Location(List<ClusterNode> nodes, List<List<ClusterNode>> 
assignments, byte flags) {
+    public NodesMapping(List<ClusterNode> nodes, List<List<ClusterNode>> 
assignments, byte flags) {
         this.nodes = nodes;
         this.assignments = assignments;
         this.flags = flags;
@@ -49,15 +51,15 @@ public class Location {
         return nodes;
     }
 
-    public List<ClusterNode> nodes(int part) {
-        return assignments.get(part % assignments.size());
+    public List<List<ClusterNode>> assignments() {
+        return assignments;
     }
 
-    public Location mergeWith(Location other) throws LocationMappingException {
+    public NodesMapping mergeWith(NodesMapping other) throws 
LocationMappingException {
         byte flags = (byte) (this.flags | other.flags);
 
         if ((flags & PARTIALLY_REPLICATED) == 0)
-            return new Location(U.firstNotNull(nodes, other.nodes), 
mergeAssignments(other, null), flags);
+            return new NodesMapping(U.firstNotNull(nodes, other.nodes), 
mergeAssignments(other, null), flags);
 
         List<ClusterNode> nodes;
 
@@ -71,10 +73,65 @@ public class Location {
         if (nodes != null && nodes.isEmpty())
             throw new LocationMappingException("Failed to map fragment to 
location.");
 
-        return new Location(nodes, mergeAssignments(other, nodes), flags);
+        return new NodesMapping(nodes, mergeAssignments(other, nodes), flags);
+    }
+
+    public NodesMapping deduplicate() throws LocationMappingException {
+        if (assignments == null || !excessive())
+            return this;
+
+        HashSet<ClusterNode> nodes0 = new HashSet<>();
+        List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
+
+        for (List<ClusterNode> partNodes : assignments) {
+            ClusterNode node = F.first(partNodes);
+
+            if (node == null)
+                throw new LocationMappingException("Failed to map fragment to 
location.");
+
+            assignments0.add(Collections.singletonList(node));
+            nodes0.add(node);
+        }
+
+        return new NodesMapping(new ArrayList<>(nodes0), assignments0, 
(byte)(flags | DEDUPLICATED));
+    }
+
+    public int[] partitions(ClusterNode node) {
+        if (assignments == null)
+            return null;
+
+        GridIntList parts = new GridIntList(assignments.size());
+
+        for (int i = 0; i < assignments.size(); i++) {
+            List<ClusterNode> assignment = assignments.get(i);
+            if (Objects.equals(node, F.first(assignment)))
+                parts.add(i);
+        }
+
+        return parts.array();
+    }
+
+    public boolean excessive() {
+        return (flags & DEDUPLICATED) == 0;
+    }
+
+    public boolean hasMovingPartitions() {
+        return (flags & HAS_MOVING_PARTITIONS) == HAS_MOVING_PARTITIONS;
+    }
+
+    public boolean hasReplicatedCaches() {
+        return (flags & HAS_REPLICATED_CACHES) == HAS_REPLICATED_CACHES;
+    }
+
+    public boolean hasPartitionedCaches() {
+        return (flags & HAS_PARTITIONED_CACHES) == HAS_PARTITIONED_CACHES;
     }
 
-    private List<List<ClusterNode>> mergeAssignments(Location other, 
List<ClusterNode> nodes) throws LocationMappingException {
+    public boolean partiallyReplicated() {
+        return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
+    }
+
+    private List<List<ClusterNode>> mergeAssignments(NodesMapping other, 
List<ClusterNode> nodes) throws LocationMappingException {
         byte flags = (byte) (this.flags | other.flags); 
List<List<ClusterNode>> left = assignments, right = other.assignments;
 
         if (left == null && right == null)
@@ -126,40 +183,4 @@ public class Location {
 
         return assignments;
     }
-
-    public Location deduplicate() throws LocationMappingException {
-        if (assignments == null || (flags & DEDUPLICATED) == DEDUPLICATED)
-            return this;
-
-        HashSet<ClusterNode> nodes0 = new HashSet<>();
-        List<List<ClusterNode>> assignments0 = new 
ArrayList<>(assignments.size());
-
-        for (List<ClusterNode> partNodes : assignments) {
-            ClusterNode node = F.first(partNodes);
-
-            if (node == null)
-                throw new LocationMappingException("Failed to map fragment to 
location.");
-
-            assignments0.add(Collections.singletonList(node));
-            nodes0.add(node);
-        }
-
-        return new Location(new ArrayList<>(nodes0), assignments0, 
(byte)(flags | DEDUPLICATED));
-    }
-
-    public boolean hasMovingPartitions() {
-        return (flags & HAS_MOVING_PARTITIONS) == HAS_MOVING_PARTITIONS;
-    }
-
-    public boolean hasReplicatedCaches() {
-        return (flags & HAS_REPLICATED_CACHES) == HAS_REPLICATED_CACHES;
-    }
-
-    public boolean hasPartitionedCaches() {
-        return (flags & HAS_PARTITIONED_CACHES) == HAS_PARTITIONED_CACHES;
-    }
-
-    public boolean partiallyReplicated() {
-        return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index fc6e4fa..16d662d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -16,9 +16,17 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
+import java.util.Arrays;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.jetbrains.annotations.NotNull;
 
@@ -29,6 +37,18 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
     private static final RelMetadataQueryEx PROTO = new RelMetadataQueryEx();
     private static final JaninoRelMetadataProvider PROVIDER = 
JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
 
+    static {
+        PROVIDER.register(Arrays.asList(
+            IgniteExchange.class,
+            IgniteFilter.class,
+            IgniteHashJoin.class,
+            IgniteProject.class,
+            IgniteTableScan.class,
+            Receiver.class,
+            Sender.class
+        ));
+    }
+
     private IgniteMetadata.DistributionTraitMetadata.Handler 
distributionTraitHandler;
     private IgniteMetadata.FragmentLocationMetadata.Handler 
sourceDistributionHandler;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 5fce676..13f9b92 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Util;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 /**
  *
@@ -54,6 +55,11 @@ public final class IgniteExchange extends SingleRel 
implements IgniteRel {
         return new IgniteExchange(getCluster(), traitSet, sole(inputs));
     }
 
+    /** {@inheritDoc} */
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
+    }
+
     @Override public RelWriter explainTerms(RelWriter pw) {
         return super.explainTerms(pw)
             .item("distribution", 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index c1991e8..6ddafb8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 public final class IgniteFilter extends Filter implements IgniteRel {
   private final Set<CorrelationId> variablesSet;
@@ -47,6 +48,11 @@ public final class IgniteFilter extends Filter implements 
IgniteRel {
     return new IgniteFilter(getCluster(), traitSet, input, condition, 
variablesSet);
   }
 
+  /** {@inheritDoc} */
+  @Override public <T> T implement(Implementor<T> implementor) {
+    return implementor.implement(this);
+  }
+
   @Override public RelWriter explainTerms(RelWriter pw) {
     return super.explainTerms(pw)
         .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty());
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
index 6450abd..dd3c725 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 public final class IgniteHashJoin extends Join implements IgniteRel {
   private final boolean semiJoinDone;
@@ -47,6 +48,11 @@ public final class IgniteHashJoin extends Join implements 
IgniteRel {
     return new IgniteHashJoin(getCluster(), traitSet, left, right, 
conditionExpr, variablesSet, joinType, semiJoinDone);
   }
 
+  /** {@inheritDoc} */
+  @Override public <T> T implement(Implementor<T> implementor) {
+    return implementor.implement(this);
+  }
+
   @Override public RelWriter explainTerms(RelWriter pw) {
     // Don't ever print semiJoinDone=false. This way, we
     // don't clutter things up in optimizers that don't use semi-joins.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 1f3eaf2..e8ce5c1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 public final class IgniteProject extends Project implements IgniteRel {
   public IgniteProject(
@@ -42,6 +43,11 @@ public final class IgniteProject extends Project implements 
IgniteRel {
     return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
   }
 
+  /** {@inheritDoc} */
+  @Override public <T> T implement(Implementor<T> implementor) {
+    return implementor.implement(this);
+  }
+
   public static IgniteProject create(Project project, RelNode input) {
     RelTraitSet traits = project.getTraitSet()
         .replace(IgniteRel.IGNITE_CONVENTION)
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index d444f4b..211dc20 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.rel;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 /**
  *
@@ -32,4 +33,6 @@ public interface IgniteRel extends RelNode {
                 && toTraits.containsIfApplicable(IGNITE_CONVENTION); // 
Enables trait definition conversion
         }
     };
+
+    <T> T implement(Implementor<T> implementor);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 4f854c3..969180d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 public final class IgniteTableScan extends TableScan implements IgniteRel {
   public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable table) {
@@ -37,8 +37,12 @@ public final class IgniteTableScan extends TableScan 
implements IgniteRel {
     return this;
   }
 
+  /** {@inheritDoc} */
+  @Override public <T> T implement(Implementor<T> implementor) {
+    return implementor.implement(this);
+  }
+
   public FragmentLocation location() {
-    boolean local = !getTraitSet().isEnabled(DistributionTraitDef.INSTANCE);
-    return 
getTable().unwrap(IgniteTable.class).location(getCluster().getPlanner().getContext(),
 local);
+    return 
getTable().unwrap(IgniteTable.class).location(getCluster().getPlanner().getContext());
   }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index 74c9d1b..7e5747c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -21,9 +21,11 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 /**
  *
@@ -50,10 +52,15 @@ public final class Receiver extends SingleRel implements 
IgniteRel {
         return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
     }
 
-    public void init(FragmentLocation targetDistribution, RelMetadataQueryEx 
mq) {
-        getInput().init(targetDistribution, 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
+    /** {@inheritDoc} */
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
+    }
 
-        sourceDistribution = getInput().location(mq);
+    public void init(FragmentLocation targetDistribution, RelMetadataQuery mq) 
{
+        sourceDistribution = IgniteMdFragmentLocation.location(getInput(), mq);
+
+        getInput().init(targetDistribution, 
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
     }
 
     public FragmentLocation sourceDistribution() {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index f53b953..959f2cb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -23,9 +23,10 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 /**
  *
@@ -51,6 +52,11 @@ public final class Sender extends SingleRel implements 
IgniteRel {
         return new Sender(getCluster(), traitSet, sole(inputs));
     }
 
+    /** {@inheritDoc} */
+    @Override public <T> T implement(Implementor<T> implementor) {
+        return implementor.implement(this);
+    }
+
     public void init(FragmentLocation targetLocation, DistributionTrait 
targetDistribution) {
         this.targetLocation = targetLocation;
         this.targetDistribution = targetDistribution;
@@ -58,7 +64,7 @@ public final class Sender extends SingleRel implements 
IgniteRel {
 
     public DestinationFunction targetFunction() {
         if (destinationFunction == null) {
-            assert targetLocation != null && targetLocation.location != null 
&& targetDistribution != null;
+            assert targetLocation != null && targetLocation.mapping() != null 
&& targetDistribution != null;
 
             destinationFunction = 
targetDistribution.destinationFunctionFactory().create(targetLocation, 
targetDistribution.keys());
         }
@@ -68,7 +74,7 @@ public final class Sender extends SingleRel implements 
IgniteRel {
 
     public FragmentLocation location(RelMetadataQuery mq) {
         if (location == null)
-            location = 
RelMetadataQueryEx.wrap(mq).getFragmentLocation(getInput());
+            location = IgniteMdFragmentLocation.location(getInput(), mq);
 
         return location;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index f0d1a2a..3ba7b50 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -26,15 +26,16 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /** */
@@ -80,21 +81,12 @@ public class IgniteTable extends AbstractTable implements 
TranslatableTable {
         return 
distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType);
     }
 
-    public FragmentLocation location(Context context, boolean local) {
+    public FragmentLocation location(Context ctx) {
         int cacheId = CU.cacheId(cacheName);
+        AffinityTopologyVersion topVer = topologyVersion(ctx);
+        NodesMapping mapping = locationRegistry(ctx).distributed(cacheId, 
topVer);
 
-        FragmentLocation res = new FragmentLocation();
-
-        GridIntList localInputs = new GridIntList();
-        localInputs.add(cacheId);
-        res.localInputs = localInputs;
-
-        if (!local)
-            res.location = locationRegistry(context).distributed(cacheId, 
topologyVersion(context));
-
-        res.topVer = topologyVersion(context);
-
-        return res;
+        return new FragmentLocation(mapping, ImmutableIntList.of(cacheId), 
topVer);
     }
 
     private LocationRegistry locationRegistry(Context ctx) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 2fd35d7..75bc37e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -18,12 +18,13 @@ package 
org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.util.typedef.F;
@@ -40,36 +41,28 @@ public class Fragment {
         this.rel = rel;
     }
 
-    public void init(Context ctx) {
-        RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
+    public void init(Context ctx, RelMetadataQuery mq) {
+        fragmentLocation = IgniteMdFragmentLocation.location(rel, mq);
 
-        fragmentLocation = mq.getFragmentLocation(rel);
-
-        AffinityTopologyVersion topVer = topologyVersion(ctx);
-
-        if (fragmentLocation.location == null) {
-            if (!isRoot())
-                fragmentLocation.location = registry(ctx).random(topVer);
-            else if (!F.isEmpty(fragmentLocation.remoteInputs))
-                fragmentLocation.location = registry(ctx).single(topVer);
-        }
+        if (fragmentLocation.mapping() == null)
+            fragmentLocation.mapping(remote() ? 
registry(ctx).random(topologyVersion(ctx)) : registry(ctx).local());
         else {
             try {
-                fragmentLocation.location = 
fragmentLocation.location.deduplicate();
+                
fragmentLocation.mapping(fragmentLocation.mapping().deduplicate());
             }
             catch (LocationMappingException e) {
                 throw new IgniteSQLException("Failed to map fragment to 
location, partition lost.", e);
             }
         }
 
-        if (!F.isEmpty(fragmentLocation.remoteInputs)) {
-            for (Receiver input : fragmentLocation.remoteInputs)
+        if (!F.isEmpty(fragmentLocation.remoteInputs())) {
+            for (Receiver input : fragmentLocation.remoteInputs())
                 input.init(fragmentLocation, mq);
         }
     }
 
-    private boolean isRoot() {
-        return !(rel instanceof Sender);
+    private boolean remote() {
+        return rel instanceof Sender;
     }
 
     private LocationRegistry registry(Context ctx) {
@@ -81,13 +74,12 @@ public class Fragment {
     }
 
     public void reset() {
-        if (rel instanceof Sender)
+        if (remote())
             ((Sender) rel).reset();
 
-        if (fragmentLocation != null && 
!F.isEmpty(fragmentLocation.remoteInputs)) {
-            for (Receiver receiver : fragmentLocation.remoteInputs) {
+        if (fragmentLocation != null && 
!F.isEmpty(fragmentLocation.remoteInputs())) {
+            for (Receiver receiver : fragmentLocation.remoteInputs())
                 receiver.reset();
-            }
         }
 
         fragmentLocation = null;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index b944a9b..a95c7fd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -23,6 +23,7 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
@@ -40,10 +41,12 @@ public class QueryPlan {
     public void init(Context ctx) {
         int i = 0;
 
+        RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
+
         while (true) {
             try {
                 for (Fragment fragment : fragments)
-                    fragment.init(ctx);
+                    fragment.init(ctx, mq);
 
                 break;
             }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 57ccaf8..1237421 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
 
@@ -37,6 +39,8 @@ import static 
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
 public class IgniteDistributions {
     private static final DestinationFunctionFactory NO_OP_FACTORY = (t,k) -> 
null;
     private static final DestinationFunctionFactory HASH_FACTORY = (t,k) -> {
+        assert t.mapping() != null && !F.isEmpty(t.mapping().assignments());
+
         int[] fields = k.toIntArray();
 
         ToIntFunction<Object> hashFun = r -> {
@@ -53,7 +57,15 @@ public class IgniteDistributions {
             return hash;
         };
 
-        return r -> t.location.nodes(hashFun.applyAsInt(r));
+        List<List<ClusterNode>> assignments = t.mapping().assignments();
+
+        if (U.assertionsEnabled()) {
+            for (List<ClusterNode> assignment : assignments) {
+                assert F.isEmpty(assignment) || assignment.size() == 1;
+            }
+        }
+
+        return r -> assignments.get(hashFun.applyAsInt(r) % 
assignments.size());
     };
 
 
@@ -88,7 +100,7 @@ public class IgniteDistributions {
 
     public static DestinationFunctionFactory singleTargetFunction() {
         return (t, k) -> {
-            List<ClusterNode> nodes = t.location.nodes();
+            List<ClusterNode> nodes = t.mapping().nodes().subList(0, 1);
 
             return r -> nodes;
         };
@@ -96,7 +108,7 @@ public class IgniteDistributions {
 
     public static DestinationFunctionFactory allTargetsFunction() {
         return (t, k) -> {
-            List<ClusterNode> nodes = t.location.nodes();
+            List<ClusterNode> nodes = t.mapping().nodes();
 
             return r -> nodes;
         };
@@ -104,7 +116,7 @@ public class IgniteDistributions {
 
     public static DestinationFunctionFactory randomTargetFunction() {
         return (t, k) -> {
-            List<ClusterNode> nodes = t.location.nodes();
+            List<ClusterNode> nodes = t.mapping().nodes();
 
             return r -> 
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
         };
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 671bc94..d594bb4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -231,8 +231,4 @@ public final class Commons {
 
         return res;
     }
-
-//    public String explainToString(RelNode node) {
-//        RelWriterImpl
-//    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
similarity index 59%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
index 7106e81..7a5a145 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
@@ -14,19 +14,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.metadata;
+package org.apache.ignite.internal.processors.query.calcite.util;
 
-import java.util.List;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 
 /**
  *
  */
-public class FragmentLocation {
-    public Location location;
-    public List<Receiver> remoteInputs;
-    public GridIntList localInputs;
-    public AffinityTopologyVersion topVer;
+public interface Implementor<T> {
+    T implement(IgniteRel other);
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index c5140c1..f947462 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -35,8 +35,8 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -339,11 +339,11 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 return IgniteDistributions.broadcast();
             }
 
-            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+            @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
-                    return new Location(select(nodes, 0,1,2), null, 
Location.HAS_REPLICATED_CACHES);
+                    return new NodesMapping(select(nodes, 0,1,2), null, 
NodesMapping.HAS_REPLICATED_CACHES);
                 if (cacheId == CU.cacheId("Project"))
-                    return new Location(select(nodes, 0,1,2), null, 
Location.HAS_REPLICATED_CACHES);
+                    return new NodesMapping(select(nodes, 0,1,2), null, 
NodesMapping.HAS_REPLICATED_CACHES);
 
                 throw new AssertionError("Unexpected cache id:" + cacheId);
             }
@@ -422,17 +422,17 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
             }
 
-            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+            @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
-                    return new Location(null, Arrays.asList(
+                    return new NodesMapping(null, Arrays.asList(
                         select(nodes, 0,1),
                         select(nodes, 1,2),
                         select(nodes, 2,0),
                         select(nodes, 0,1),
                         select(nodes, 1,2)
-                    ), Location.HAS_PARTITIONED_CACHES);
+                    ), NodesMapping.HAS_PARTITIONED_CACHES);
                 if (cacheId == CU.cacheId("Project"))
-                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+                    return new NodesMapping(select(nodes, 0,1), null, 
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
 
                 throw new AssertionError("Unexpected cache id:" + cacheId);
             }
@@ -510,17 +510,17 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
             }
 
-            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+            @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
-                    return new Location(null, Arrays.asList(
+                    return new NodesMapping(null, Arrays.asList(
                         select(nodes, 1),
                         select(nodes, 2),
                         select(nodes, 2),
                         select(nodes, 0),
                         select(nodes, 1)
-                    ), Location.HAS_PARTITIONED_CACHES);
+                    ), NodesMapping.HAS_PARTITIONED_CACHES);
                 if (cacheId == CU.cacheId("Project"))
-                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+                    return new NodesMapping(select(nodes, 0,1), null, 
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
 
                 throw new AssertionError("Unexpected cache id:" + cacheId);
             }
@@ -595,12 +595,12 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 return IgniteDistributions.broadcast();
             }
 
-            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+            @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
-                    return new Location(select(nodes, 2), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+                    return new NodesMapping(select(nodes, 2), null, 
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
 
                 else if (cacheId == CU.cacheId("Project"))
-                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+                    return new NodesMapping(select(nodes, 0,1), null, 
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
 
                 throw new AssertionError("Unexpected cache id:" + cacheId);
             }
@@ -679,17 +679,17 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
             }
 
-            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+            @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
-                    return new Location(null, Arrays.asList(
+                    return new NodesMapping(null, Arrays.asList(
                         select(nodes, 0,1),
                         select(nodes, 1,2),
                         select(nodes, 2,0),
                         select(nodes, 0,1),
                         select(nodes, 1,2)
-                    ), Location.HAS_PARTITIONED_CACHES);
+                    ), NodesMapping.HAS_PARTITIONED_CACHES);
                 if (cacheId == CU.cacheId("Project"))
-                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+                    return new NodesMapping(select(nodes, 0,1), null, 
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
 
                 throw new AssertionError("Unexpected cache id:" + cacheId);
             }
@@ -768,17 +768,17 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
             }
 
-            @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+            @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
-                    return new Location(null, Arrays.asList(
+                    return new NodesMapping(null, Arrays.asList(
                         select(nodes, 0,1),
                         select(nodes, 2),
                         select(nodes, 2,0),
                         select(nodes, 0,1),
                         select(nodes, 1,2)
-                    ), Location.HAS_PARTITIONED_CACHES);
+                    ), NodesMapping.HAS_PARTITIONED_CACHES);
                 if (cacheId == CU.cacheId("Project"))
-                    return new Location(select(nodes, 0,1), null, 
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+                    return new NodesMapping(select(nodes, 0,1), null, 
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
 
                 throw new AssertionError("Unexpected cache id:" + cacheId);
             }
@@ -849,35 +849,35 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     private static class TestRegistry implements LocationRegistry, 
DistributionRegistry {
-        @Override public Location random(AffinityTopologyVersion topVer) {
-            return new Location(select(nodes, 0,1,2,3), null, (byte) 0);
+        @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+            return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
         }
 
-        @Override public Location single(AffinityTopologyVersion topVer) {
-            return new Location(select(nodes, 0), null, (byte) 0);
+        @Override public NodesMapping local() {
+            return new NodesMapping(select(nodes, 0), null, (byte) 0);
         }
 
         @Override public DistributionTrait distribution(int cacheId, RowType 
rowType) {
             return IgniteDistributions.hash(rowType.distributionKeys(), 
IgniteDistributions.hashFunction());
         }
 
-        @Override public Location distributed(int cacheId, 
AffinityTopologyVersion topVer) {
+        @Override public NodesMapping distributed(int cacheId, 
AffinityTopologyVersion topVer) {
             if (cacheId == CU.cacheId("Developer"))
-                return new Location(null, Arrays.asList(
+                return new NodesMapping(null, Arrays.asList(
                     select(nodes, 0,1),
                     select(nodes, 1,2),
                     select(nodes, 2,0),
                     select(nodes, 0,1),
                     select(nodes, 1,2)
-                ), Location.HAS_PARTITIONED_CACHES);
+                ), NodesMapping.HAS_PARTITIONED_CACHES);
             if (cacheId == CU.cacheId("Project"))
-                return new Location(null, Arrays.asList(
+                return new NodesMapping(null, Arrays.asList(
                     select(nodes, 0,1),
                     select(nodes, 1,2),
                     select(nodes, 2,0),
                     select(nodes, 0,1),
                     select(nodes, 1,2)
-                ), Location.HAS_PARTITIONED_CACHES);
+                ), NodesMapping.HAS_PARTITIONED_CACHES);
 
             throw new AssertionError("Unexpected cache id:" + cacheId);
         }

Reply via email to