This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 4a842bf refactoring
4a842bf is described below
commit 4a842bf4d7559377e55013bc9467fc0f05de0d24
Author: Igor Seliverstov <[email protected]>
AuthorDate: Wed Nov 13 16:09:05 2019 +0300
refactoring
---
.../query/calcite/cluster/RegistryImpl.java | 47 ++++----
.../query/calcite/metadata/FragmentLocation.java | 48 +++++++--
.../calcite/metadata/IgniteMdFragmentLocation.java | 45 +++-----
.../query/calcite/metadata/LocationRegistry.java | 6 +-
.../metadata/{Location.java => NodesMapping.java} | 119 ++++++++++++---------
.../query/calcite/metadata/RelMetadataQueryEx.java | 20 ++++
.../query/calcite/rel/IgniteExchange.java | 6 ++
.../processors/query/calcite/rel/IgniteFilter.java | 6 ++
.../query/calcite/rel/IgniteHashJoin.java | 6 ++
.../query/calcite/rel/IgniteProject.java | 6 ++
.../processors/query/calcite/rel/IgniteRel.java | 3 +
.../query/calcite/rel/IgniteTableScan.java | 10 +-
.../processors/query/calcite/rel/Receiver.java | 15 ++-
.../processors/query/calcite/rel/Sender.java | 12 ++-
.../query/calcite/schema/IgniteTable.java | 20 ++--
.../query/calcite/splitter/Fragment.java | 36 +++----
.../query/calcite/splitter/QueryPlan.java | 5 +-
.../query/calcite/trait/IgniteDistributions.java | 20 +++-
.../processors/query/calcite/util/Commons.java | 4 -
.../Implementor.java} | 14 +--
.../query/calcite/CalciteQueryProcessorTest.java | 64 +++++------
21 files changed, 310 insertions(+), 202 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 4ad7d48..1736da0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -29,14 +29,15 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
@@ -58,22 +59,22 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
return IgniteDistributions.hash(rowType.distributionKeys(), new
AffinityFactory(partFun, key));
}
- @Override public Location single(AffinityTopologyVersion topVer) {
- return new
Location(Collections.singletonList(ctx.discovery().localNode()), null, (byte)
0);
+ @Override public NodesMapping local() {
+ return new
NodesMapping(Collections.singletonList(ctx.discovery().localNode()), null,
(byte) 0);
}
- @Override public Location random(AffinityTopologyVersion topVer) {
- return new Location(ctx.discovery().discoCache(topVer).serverNodes(),
null, (byte) 0);
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new
NodesMapping(ctx.discovery().discoCache(topVer).serverNodes(), null, (byte) 0);
}
- @Override public Location distributed(int cacheId, AffinityTopologyVersion
topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
return cctx.isReplicated() ? replicatedLocation(cctx, topVer) :
partitionedLocation(cctx, topVer);
}
- private Location partitionedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
- byte flags = Location.HAS_PARTITIONED_CACHES;
+ private NodesMapping partitionedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
+ byte flags = NodesMapping.HAS_PARTITIONED_CACHES;
List<List<ClusterNode>> assignments =
cctx.affinity().assignments(topVer);
@@ -86,7 +87,7 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
assignments = assignments0;
}
else if (!cctx.topology().rebalanceFinished(topVer)) {
- flags |= Location.HAS_MOVING_PARTITIONS;
+ flags |= NodesMapping.HAS_MOVING_PARTITIONS;
List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
@@ -104,19 +105,19 @@ public class RegistryImpl implements
DistributionRegistry, LocationRegistry {
assignments = assignments0;
}
- return new Location(null, assignments, flags);
+ return new NodesMapping(null, assignments, flags);
}
- private Location replicatedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
- byte flags = Location.HAS_REPLICATED_CACHES;
+ private NodesMapping replicatedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
+ byte flags = NodesMapping.HAS_REPLICATED_CACHES;
if (cctx.config().getNodeFilter() != null)
- flags |= Location.PARTIALLY_REPLICATED;
+ flags |= NodesMapping.PARTIALLY_REPLICATED;
List<ClusterNode> nodes =
cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.cacheId());
if (!cctx.topology().rebalanceFinished(topVer)) {
- flags |= Location.PARTIALLY_REPLICATED;
+ flags |= NodesMapping.PARTIALLY_REPLICATED;
List<ClusterNode> nodes0 = new ArrayList<>(nodes.size());
@@ -135,7 +136,7 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
nodes = nodes0;
}
- return new Location(nodes, null, flags);
+ return new NodesMapping(nodes, null, flags);
}
private static class AffinityFactory implements DestinationFunctionFactory
{
@@ -148,9 +149,17 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
}
@Override public DestinationFunction create(FragmentLocation
targetLocation, ImmutableIntList keys) {
- assert keys.size() == 1 && targetLocation.location != null;
+ assert keys.size() == 1 && targetLocation.mapping() != null &&
!F.isEmpty(targetLocation.mapping().assignments());
- return create(targetLocation.location, partFun, keys.getInt(0));
+ List<List<ClusterNode>> assignments =
targetLocation.mapping().assignments();
+
+ if (U.assertionsEnabled()) {
+ for (List<ClusterNode> assignment : assignments) {
+ assert F.isEmpty(assignment) || assignment.size() == 1;
+ }
+ }
+
+ return create(assignments, partFun, keys.getInt(0));
}
@Override public boolean equals(Object o) {
@@ -166,8 +175,8 @@ public class RegistryImpl implements DistributionRegistry,
LocationRegistry {
return key.hashCode();
}
- private static DestinationFunction create(Location location,
ToIntFunction<Object> partFun, int affField) {
- return row -> location.nodes(partFun.applyAsInt(((Object[])
row)[affField]));
+ private static DestinationFunction create(List<List<ClusterNode>>
assignments, ToIntFunction<Object> partFun, int affField) {
+ return row -> assignments.get(partFun.applyAsInt(((Object[])
row)[affField]));
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
index 7106e81..26c203e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
@@ -16,17 +16,53 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.util.GridIntList;
/**
*
*/
public class FragmentLocation {
- public Location location;
- public List<Receiver> remoteInputs;
- public GridIntList localInputs;
- public AffinityTopologyVersion topVer;
+ private NodesMapping mapping;
+
+ private final ImmutableList<Receiver> remoteInputs;
+ private final ImmutableIntList localInputs;
+ private final AffinityTopologyVersion topVer;
+
+ public FragmentLocation(ImmutableList<Receiver> remoteInputs,
AffinityTopologyVersion topVer) {
+ this(null, remoteInputs, null, topVer);
+ }
+
+ public FragmentLocation(NodesMapping mapping, ImmutableIntList
localInputs, AffinityTopologyVersion topVer) {
+ this(mapping, null, localInputs, topVer);
+ }
+
+ public FragmentLocation(NodesMapping mapping, ImmutableList<Receiver>
remoteInputs, ImmutableIntList localInputs, AffinityTopologyVersion topVer) {
+ this.mapping = mapping;
+ this.remoteInputs = remoteInputs;
+ this.localInputs = localInputs;
+ this.topVer = topVer;
+ }
+
+ public NodesMapping mapping() {
+ return mapping;
+ }
+
+ public void mapping(NodesMapping mapping) {
+ this.mapping = mapping;
+ }
+
+ public ImmutableList<Receiver> remoteInputs() {
+ return remoteInputs;
+ }
+
+ public ImmutableIntList localInputs() {
+ return localInputs;
+ }
+
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
index 0c20e59..5e6e8a9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
@@ -16,8 +16,7 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.volcano.RelSubset;
@@ -29,15 +28,14 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.Edge;
import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
-import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -79,9 +77,9 @@ public class IgniteMdFragmentLocation implements
MetadataHandler<FragmentLocatio
}
catch (LocationMappingException e) {
// a replicated cache is cheaper to redistribute
- if (!leftLoc.location.hasPartitionedCaches())
+ if (!leftLoc.mapping().hasPartitionedCaches())
throw planningException(rel, e, true);
- else if (!rightLoc.location.hasPartitionedCaches())
+ else if (!rightLoc.mapping().hasPartitionedCaches())
throw planningException(rel, e, false);
// both sub-trees have partitioned sources, less cost is better
@@ -104,12 +102,8 @@ public class IgniteMdFragmentLocation implements
MetadataHandler<FragmentLocatio
}
public FragmentLocation getLocation(Receiver rel, RelMetadataQuery mq) {
- FragmentLocation res = new FragmentLocation();
-
- res.remoteInputs = Collections.singletonList(rel);
- res.topVer =
rel.getCluster().getPlanner().getContext().unwrap(AffinityTopologyVersion.class);
-
- return res;
+ return new FragmentLocation(ImmutableList.of(rel),
+
rel.getCluster().getPlanner().getContext().unwrap(AffinityTopologyVersion.class));
}
public FragmentLocation getLocation(IgniteTableScan rel, RelMetadataQuery
mq) {
@@ -121,17 +115,13 @@ public class IgniteMdFragmentLocation implements
MetadataHandler<FragmentLocatio
}
private static FragmentLocation merge(FragmentLocation left,
FragmentLocation right) throws LocationMappingException {
- FragmentLocation res = new FragmentLocation();
-
- res.location = merge(left.location, right.location);
- res.remoteInputs = merge(left.remoteInputs, right.remoteInputs);
- res.localInputs = merge(left.localInputs, right.localInputs);
- res.topVer = U.firstNotNull(left.topVer, right.topVer);
-
- return res;
+ return new FragmentLocation(merge(left.mapping(), right.mapping()),
+ merge(left.remoteInputs(), right.remoteInputs()),
+ merge(left.localInputs(), right.localInputs()),
+ U.firstNotNull(left.topologyVersion(), right.topologyVersion()));
}
- private static Location merge(Location left, Location right) throws
LocationMappingException {
+ private static NodesMapping merge(NodesMapping left, NodesMapping right)
throws LocationMappingException {
if (left == null)
return right;
if (right == null)
@@ -140,22 +130,21 @@ public class IgniteMdFragmentLocation implements
MetadataHandler<FragmentLocatio
return left.mergeWith(right);
}
- private static <T> List<T> merge(List<T> left, List<T> right) {
+ private static <T> ImmutableList<T> merge(ImmutableList<T> left,
ImmutableList<T> right) {
if (left == null)
return right;
if (right == null)
return left;
- return Commons.union(left, right);
+ return ImmutableList.<T>builder().addAll(left).addAll(right).build();
}
- private static GridIntList merge(GridIntList left, GridIntList right) {
+ private static ImmutableIntList merge(ImmutableIntList left,
ImmutableIntList right) {
if (left == null)
return right;
+ if (right == null)
+ return left;
- if (right != null)
- left.addAll(right);
-
- return left;
+ return left.appendAll(right);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
index d81e440..bf62302 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
@@ -22,7 +22,7 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
*
*/
public interface LocationRegistry {
- Location single(AffinityTopologyVersion topVer); // returns local node
with single partition
- Location random(AffinityTopologyVersion topVer); // returns random
distribution, partitions count depends on nodes count
- Location distributed(int cacheId, AffinityTopologyVersion topVer); //
returns cache distribution
+ NodesMapping local(); // returns local node with single partition
+ NodesMapping random(AffinityTopologyVersion topVer); // returns random
distribution, partitions count depends on nodes count
+ NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer); //
returns cache distribution
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
similarity index 74%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index c83d16b..d40af2a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/Location.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -20,26 +20,28 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
*/
-public class Location {
- public static byte HAS_MOVING_PARTITIONS = 0x1;
- public static byte HAS_REPLICATED_CACHES = 0x2;
- public static byte HAS_PARTITIONED_CACHES = 0x4;
- public static byte PARTIALLY_REPLICATED = 0x8;
- public static byte DEDUPLICATED = 0x16;
+public class NodesMapping {
+ public static final byte HAS_MOVING_PARTITIONS = 0x1;
+ public static final byte HAS_REPLICATED_CACHES = 0x2;
+ public static final byte HAS_PARTITIONED_CACHES = 0x4;
+ public static final byte PARTIALLY_REPLICATED = 0x8;
+ public static final byte DEDUPLICATED = 0x16;
private final List<ClusterNode> nodes;
private final List<List<ClusterNode>> assignments;
private final byte flags;
- public Location(List<ClusterNode> nodes, List<List<ClusterNode>>
assignments, byte flags) {
+ public NodesMapping(List<ClusterNode> nodes, List<List<ClusterNode>>
assignments, byte flags) {
this.nodes = nodes;
this.assignments = assignments;
this.flags = flags;
@@ -49,15 +51,15 @@ public class Location {
return nodes;
}
- public List<ClusterNode> nodes(int part) {
- return assignments.get(part % assignments.size());
+ public List<List<ClusterNode>> assignments() {
+ return assignments;
}
- public Location mergeWith(Location other) throws LocationMappingException {
+ public NodesMapping mergeWith(NodesMapping other) throws
LocationMappingException {
byte flags = (byte) (this.flags | other.flags);
if ((flags & PARTIALLY_REPLICATED) == 0)
- return new Location(U.firstNotNull(nodes, other.nodes),
mergeAssignments(other, null), flags);
+ return new NodesMapping(U.firstNotNull(nodes, other.nodes),
mergeAssignments(other, null), flags);
List<ClusterNode> nodes;
@@ -71,10 +73,65 @@ public class Location {
if (nodes != null && nodes.isEmpty())
throw new LocationMappingException("Failed to map fragment to
location.");
- return new Location(nodes, mergeAssignments(other, nodes), flags);
+ return new NodesMapping(nodes, mergeAssignments(other, nodes), flags);
+ }
+
+ public NodesMapping deduplicate() throws LocationMappingException {
+ if (assignments == null || !excessive())
+ return this;
+
+ HashSet<ClusterNode> nodes0 = new HashSet<>();
+ List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
+
+ for (List<ClusterNode> partNodes : assignments) {
+ ClusterNode node = F.first(partNodes);
+
+ if (node == null)
+ throw new LocationMappingException("Failed to map fragment to
location.");
+
+ assignments0.add(Collections.singletonList(node));
+ nodes0.add(node);
+ }
+
+ return new NodesMapping(new ArrayList<>(nodes0), assignments0,
(byte)(flags | DEDUPLICATED));
+ }
+
+ public int[] partitions(ClusterNode node) {
+ if (assignments == null)
+ return null;
+
+ GridIntList parts = new GridIntList(assignments.size());
+
+ for (int i = 0; i < assignments.size(); i++) {
+ List<ClusterNode> assignment = assignments.get(i);
+ if (Objects.equals(node, F.first(assignment)))
+ parts.add(i);
+ }
+
+ return parts.array();
+ }
+
+ public boolean excessive() {
+ return (flags & DEDUPLICATED) == 0;
+ }
+
+ public boolean hasMovingPartitions() {
+ return (flags & HAS_MOVING_PARTITIONS) == HAS_MOVING_PARTITIONS;
+ }
+
+ public boolean hasReplicatedCaches() {
+ return (flags & HAS_REPLICATED_CACHES) == HAS_REPLICATED_CACHES;
+ }
+
+ public boolean hasPartitionedCaches() {
+ return (flags & HAS_PARTITIONED_CACHES) == HAS_PARTITIONED_CACHES;
}
- private List<List<ClusterNode>> mergeAssignments(Location other,
List<ClusterNode> nodes) throws LocationMappingException {
+ public boolean partiallyReplicated() {
+ return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
+ }
+
+ private List<List<ClusterNode>> mergeAssignments(NodesMapping other,
List<ClusterNode> nodes) throws LocationMappingException {
byte flags = (byte) (this.flags | other.flags);
List<List<ClusterNode>> left = assignments, right = other.assignments;
if (left == null && right == null)
@@ -126,40 +183,4 @@ public class Location {
return assignments;
}
-
- public Location deduplicate() throws LocationMappingException {
- if (assignments == null || (flags & DEDUPLICATED) == DEDUPLICATED)
- return this;
-
- HashSet<ClusterNode> nodes0 = new HashSet<>();
- List<List<ClusterNode>> assignments0 = new
ArrayList<>(assignments.size());
-
- for (List<ClusterNode> partNodes : assignments) {
- ClusterNode node = F.first(partNodes);
-
- if (node == null)
- throw new LocationMappingException("Failed to map fragment to
location.");
-
- assignments0.add(Collections.singletonList(node));
- nodes0.add(node);
- }
-
- return new Location(new ArrayList<>(nodes0), assignments0,
(byte)(flags | DEDUPLICATED));
- }
-
- public boolean hasMovingPartitions() {
- return (flags & HAS_MOVING_PARTITIONS) == HAS_MOVING_PARTITIONS;
- }
-
- public boolean hasReplicatedCaches() {
- return (flags & HAS_REPLICATED_CACHES) == HAS_REPLICATED_CACHES;
- }
-
- public boolean hasPartitionedCaches() {
- return (flags & HAS_PARTITIONED_CACHES) == HAS_PARTITIONED_CACHES;
- }
-
- public boolean partiallyReplicated() {
- return (flags & PARTIALLY_REPLICATED) == PARTIALLY_REPLICATED;
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index fc6e4fa..16d662d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -16,9 +16,17 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
+import java.util.Arrays;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import org.jetbrains.annotations.NotNull;
@@ -29,6 +37,18 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
private static final RelMetadataQueryEx PROTO = new RelMetadataQueryEx();
private static final JaninoRelMetadataProvider PROVIDER =
JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER);
+ static {
+ PROVIDER.register(Arrays.asList(
+ IgniteExchange.class,
+ IgniteFilter.class,
+ IgniteHashJoin.class,
+ IgniteProject.class,
+ IgniteTableScan.class,
+ Receiver.class,
+ Sender.class
+ ));
+ }
+
private IgniteMetadata.DistributionTraitMetadata.Handler
distributionTraitHandler;
private IgniteMetadata.FragmentLocationMetadata.Handler
sourceDistributionHandler;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index 5fce676..13f9b92 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.Util;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
/**
*
@@ -54,6 +55,11 @@ public final class IgniteExchange extends SingleRel
implements IgniteRel {
return new IgniteExchange(getCluster(), traitSet, sole(inputs));
}
+ /** {@inheritDoc} */
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+
@Override public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.item("distribution",
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index c1991e8..6ddafb8 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
public final class IgniteFilter extends Filter implements IgniteRel {
private final Set<CorrelationId> variablesSet;
@@ -47,6 +48,11 @@ public final class IgniteFilter extends Filter implements
IgniteRel {
return new IgniteFilter(getCluster(), traitSet, input, condition,
variablesSet);
}
+ /** {@inheritDoc} */
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+
@Override public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.itemIf("variablesSet", variablesSet, !variablesSet.isEmpty());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
index 6450abd..dd3c725 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
public final class IgniteHashJoin extends Join implements IgniteRel {
private final boolean semiJoinDone;
@@ -47,6 +48,11 @@ public final class IgniteHashJoin extends Join implements
IgniteRel {
return new IgniteHashJoin(getCluster(), traitSet, left, right,
conditionExpr, variablesSet, joinType, semiJoinDone);
}
+ /** {@inheritDoc} */
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+
@Override public RelWriter explainTerms(RelWriter pw) {
// Don't ever print semiJoinDone=false. This way, we
// don't clutter things up in optimizers that don't use semi-joins.
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 1f3eaf2..e8ce5c1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
public final class IgniteProject extends Project implements IgniteRel {
public IgniteProject(
@@ -42,6 +43,11 @@ public final class IgniteProject extends Project implements
IgniteRel {
return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
}
+ /** {@inheritDoc} */
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+
public static IgniteProject create(Project project, RelNode input) {
RelTraitSet traits = project.getTraitSet()
.replace(IgniteRel.IGNITE_CONVENTION)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index d444f4b..211dc20 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.rel;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
/**
*
@@ -32,4 +33,6 @@ public interface IgniteRel extends RelNode {
&& toTraits.containsIfApplicable(IGNITE_CONVENTION); //
Enables trait definition conversion
}
};
+
+ <T> T implement(Implementor<T> implementor);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 4f854c3..969180d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
public final class IgniteTableScan extends TableScan implements IgniteRel {
public IgniteTableScan(RelOptCluster cluster, RelTraitSet traitSet,
RelOptTable table) {
@@ -37,8 +37,12 @@ public final class IgniteTableScan extends TableScan
implements IgniteRel {
return this;
}
+ /** {@inheritDoc} */
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+
public FragmentLocation location() {
- boolean local = !getTraitSet().isEnabled(DistributionTraitDef.INSTANCE);
- return
getTable().unwrap(IgniteTable.class).location(getCluster().getPlanner().getContext(),
local);
+ return
getTable().unwrap(IgniteTable.class).location(getCluster().getPlanner().getContext());
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index 74c9d1b..7e5747c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -21,9 +21,11 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
/**
*
@@ -50,10 +52,15 @@ public final class Receiver extends SingleRel implements
IgniteRel {
return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
}
- public void init(FragmentLocation targetDistribution, RelMetadataQueryEx
mq) {
- getInput().init(targetDistribution,
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
+ /** {@inheritDoc} */
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
- sourceDistribution = getInput().location(mq);
+ public void init(FragmentLocation targetDistribution, RelMetadataQuery mq)
{
+ sourceDistribution = IgniteMdFragmentLocation.location(getInput(), mq);
+
+ getInput().init(targetDistribution,
getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
}
public FragmentLocation sourceDistribution() {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index f53b953..959f2cb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -23,9 +23,10 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
/**
*
@@ -51,6 +52,11 @@ public final class Sender extends SingleRel implements
IgniteRel {
return new Sender(getCluster(), traitSet, sole(inputs));
}
+ /** {@inheritDoc} */
+ @Override public <T> T implement(Implementor<T> implementor) {
+ return implementor.implement(this);
+ }
+
public void init(FragmentLocation targetLocation, DistributionTrait
targetDistribution) {
this.targetLocation = targetLocation;
this.targetDistribution = targetDistribution;
@@ -58,7 +64,7 @@ public final class Sender extends SingleRel implements
IgniteRel {
public DestinationFunction targetFunction() {
if (destinationFunction == null) {
- assert targetLocation != null && targetLocation.location != null
&& targetDistribution != null;
+ assert targetLocation != null && targetLocation.mapping() != null
&& targetDistribution != null;
destinationFunction =
targetDistribution.destinationFunctionFactory().create(targetLocation,
targetDistribution.keys());
}
@@ -68,7 +74,7 @@ public final class Sender extends SingleRel implements
IgniteRel {
public FragmentLocation location(RelMetadataQuery mq) {
if (location == null)
- location =
RelMetadataQueryEx.wrap(mq).getFragmentLocation(getInput());
+ location = IgniteMdFragmentLocation.location(getInput(), mq);
return location;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index f0d1a2a..3ba7b50 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -26,15 +26,16 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.internal.CU;
/** */
@@ -80,21 +81,12 @@ public class IgniteTable extends AbstractTable implements
TranslatableTable {
return
distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType);
}
- public FragmentLocation location(Context context, boolean local) {
+ public FragmentLocation location(Context ctx) {
int cacheId = CU.cacheId(cacheName);
+ AffinityTopologyVersion topVer = topologyVersion(ctx);
+ NodesMapping mapping = locationRegistry(ctx).distributed(cacheId,
topVer);
- FragmentLocation res = new FragmentLocation();
-
- GridIntList localInputs = new GridIntList();
- localInputs.add(cacheId);
- res.localInputs = localInputs;
-
- if (!local)
- res.location = locationRegistry(context).distributed(cacheId,
topologyVersion(context));
-
- res.topVer = topologyVersion(context);
-
- return res;
+ return new FragmentLocation(mapping, ImmutableIntList.of(cacheId),
topVer);
}
private LocationRegistry locationRegistry(Context ctx) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 2fd35d7..75bc37e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -18,12 +18,13 @@ package
org.apache.ignite.internal.processors.query.calcite.splitter;
import org.apache.calcite.plan.Context;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
import org.apache.ignite.internal.util.typedef.F;
@@ -40,36 +41,28 @@ public class Fragment {
this.rel = rel;
}
- public void init(Context ctx) {
- RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
+ public void init(Context ctx, RelMetadataQuery mq) {
+ fragmentLocation = IgniteMdFragmentLocation.location(rel, mq);
- fragmentLocation = mq.getFragmentLocation(rel);
-
- AffinityTopologyVersion topVer = topologyVersion(ctx);
-
- if (fragmentLocation.location == null) {
- if (!isRoot())
- fragmentLocation.location = registry(ctx).random(topVer);
- else if (!F.isEmpty(fragmentLocation.remoteInputs))
- fragmentLocation.location = registry(ctx).single(topVer);
- }
+ if (fragmentLocation.mapping() == null)
+ fragmentLocation.mapping(remote() ?
registry(ctx).random(topologyVersion(ctx)) : registry(ctx).local());
else {
try {
- fragmentLocation.location =
fragmentLocation.location.deduplicate();
+
fragmentLocation.mapping(fragmentLocation.mapping().deduplicate());
}
catch (LocationMappingException e) {
throw new IgniteSQLException("Failed to map fragment to
location, partition lost.", e);
}
}
- if (!F.isEmpty(fragmentLocation.remoteInputs)) {
- for (Receiver input : fragmentLocation.remoteInputs)
+ if (!F.isEmpty(fragmentLocation.remoteInputs())) {
+ for (Receiver input : fragmentLocation.remoteInputs())
input.init(fragmentLocation, mq);
}
}
- private boolean isRoot() {
- return !(rel instanceof Sender);
+ private boolean remote() {
+ return rel instanceof Sender;
}
private LocationRegistry registry(Context ctx) {
@@ -81,13 +74,12 @@ public class Fragment {
}
public void reset() {
- if (rel instanceof Sender)
+ if (remote())
((Sender) rel).reset();
- if (fragmentLocation != null &&
!F.isEmpty(fragmentLocation.remoteInputs)) {
- for (Receiver receiver : fragmentLocation.remoteInputs) {
+ if (fragmentLocation != null &&
!F.isEmpty(fragmentLocation.remoteInputs())) {
+ for (Receiver receiver : fragmentLocation.remoteInputs())
receiver.reset();
- }
}
fragmentLocation = null;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index b944a9b..a95c7fd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -23,6 +23,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
import org.apache.ignite.internal.processors.query.calcite.util.Edge;
@@ -40,10 +41,12 @@ public class QueryPlan {
public void init(Context ctx) {
int i = 0;
+ RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
+
while (true) {
try {
for (Fragment fragment : fragments)
- fragment.init(ctx);
+ fragment.init(ctx, mq);
break;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 57ccaf8..1237421 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.cluster.ClusterNode;
import
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import static
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
@@ -37,6 +39,8 @@ import static
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
public class IgniteDistributions {
private static final DestinationFunctionFactory NO_OP_FACTORY = (t,k) ->
null;
private static final DestinationFunctionFactory HASH_FACTORY = (t,k) -> {
+ assert t.mapping() != null && !F.isEmpty(t.mapping().assignments());
+
int[] fields = k.toIntArray();
ToIntFunction<Object> hashFun = r -> {
@@ -53,7 +57,15 @@ public class IgniteDistributions {
return hash;
};
- return r -> t.location.nodes(hashFun.applyAsInt(r));
+ List<List<ClusterNode>> assignments = t.mapping().assignments();
+
+ if (U.assertionsEnabled()) {
+ for (List<ClusterNode> assignment : assignments) {
+ assert F.isEmpty(assignment) || assignment.size() == 1;
+ }
+ }
+
+ return r -> assignments.get(hashFun.applyAsInt(r) %
assignments.size());
};
@@ -88,7 +100,7 @@ public class IgniteDistributions {
public static DestinationFunctionFactory singleTargetFunction() {
return (t, k) -> {
- List<ClusterNode> nodes = t.location.nodes();
+ List<ClusterNode> nodes = t.mapping().nodes().subList(0, 1);
return r -> nodes;
};
@@ -96,7 +108,7 @@ public class IgniteDistributions {
public static DestinationFunctionFactory allTargetsFunction() {
return (t, k) -> {
- List<ClusterNode> nodes = t.location.nodes();
+ List<ClusterNode> nodes = t.mapping().nodes();
return r -> nodes;
};
@@ -104,7 +116,7 @@ public class IgniteDistributions {
public static DestinationFunctionFactory randomTargetFunction() {
return (t, k) -> {
- List<ClusterNode> nodes = t.location.nodes();
+ List<ClusterNode> nodes = t.mapping().nodes();
return r ->
Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
};
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 671bc94..d594bb4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -231,8 +231,4 @@ public final class Commons {
return res;
}
-
-// public String explainToString(RelNode node) {
-// RelWriterImpl
-// }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
similarity index 59%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
index 7106e81..7a5a145 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
@@ -14,19 +14,13 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.metadata;
+package org.apache.ignite.internal.processors.query.calcite.util;
-import java.util.List;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
/**
*
*/
-public class FragmentLocation {
- public Location location;
- public List<Receiver> remoteInputs;
- public GridIntList localInputs;
- public AffinityTopologyVersion topVer;
+public interface Implementor<T> {
+ T implement(IgniteRel other);
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index c5140c1..f947462 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -35,8 +35,8 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.Location;
import
org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -339,11 +339,11 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
return IgniteDistributions.broadcast();
}
- @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
- return new Location(select(nodes, 0,1,2), null,
Location.HAS_REPLICATED_CACHES);
+ return new NodesMapping(select(nodes, 0,1,2), null,
NodesMapping.HAS_REPLICATED_CACHES);
if (cacheId == CU.cacheId("Project"))
- return new Location(select(nodes, 0,1,2), null,
Location.HAS_REPLICATED_CACHES);
+ return new NodesMapping(select(nodes, 0,1,2), null,
NodesMapping.HAS_REPLICATED_CACHES);
throw new AssertionError("Unexpected cache id:" + cacheId);
}
@@ -422,17 +422,17 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
}
- @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
- return new Location(null, Arrays.asList(
+ return new NodesMapping(null, Arrays.asList(
select(nodes, 0,1),
select(nodes, 1,2),
select(nodes, 2,0),
select(nodes, 0,1),
select(nodes, 1,2)
- ), Location.HAS_PARTITIONED_CACHES);
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
if (cacheId == CU.cacheId("Project"))
- return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+ return new NodesMapping(select(nodes, 0,1), null,
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
throw new AssertionError("Unexpected cache id:" + cacheId);
}
@@ -510,17 +510,17 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
}
- @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
- return new Location(null, Arrays.asList(
+ return new NodesMapping(null, Arrays.asList(
select(nodes, 1),
select(nodes, 2),
select(nodes, 2),
select(nodes, 0),
select(nodes, 1)
- ), Location.HAS_PARTITIONED_CACHES);
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
if (cacheId == CU.cacheId("Project"))
- return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+ return new NodesMapping(select(nodes, 0,1), null,
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
throw new AssertionError("Unexpected cache id:" + cacheId);
}
@@ -595,12 +595,12 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
return IgniteDistributions.broadcast();
}
- @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
- return new Location(select(nodes, 2), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+ return new NodesMapping(select(nodes, 2), null,
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
else if (cacheId == CU.cacheId("Project"))
- return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+ return new NodesMapping(select(nodes, 0,1), null,
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
throw new AssertionError("Unexpected cache id:" + cacheId);
}
@@ -679,17 +679,17 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
}
- @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
- return new Location(null, Arrays.asList(
+ return new NodesMapping(null, Arrays.asList(
select(nodes, 0,1),
select(nodes, 1,2),
select(nodes, 2,0),
select(nodes, 0,1),
select(nodes, 1,2)
- ), Location.HAS_PARTITIONED_CACHES);
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
if (cacheId == CU.cacheId("Project"))
- return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+ return new NodesMapping(select(nodes, 0,1), null,
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
throw new AssertionError("Unexpected cache id:" + cacheId);
}
@@ -768,17 +768,17 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
}
- @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
- return new Location(null, Arrays.asList(
+ return new NodesMapping(null, Arrays.asList(
select(nodes, 0,1),
select(nodes, 2),
select(nodes, 2,0),
select(nodes, 0,1),
select(nodes, 1,2)
- ), Location.HAS_PARTITIONED_CACHES);
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
if (cacheId == CU.cacheId("Project"))
- return new Location(select(nodes, 0,1), null,
(byte)(Location.HAS_REPLICATED_CACHES | Location.PARTIALLY_REPLICATED));
+ return new NodesMapping(select(nodes, 0,1), null,
(byte)(NodesMapping.HAS_REPLICATED_CACHES | NodesMapping.PARTIALLY_REPLICATED));
throw new AssertionError("Unexpected cache id:" + cacheId);
}
@@ -849,35 +849,35 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
}
private static class TestRegistry implements LocationRegistry,
DistributionRegistry {
- @Override public Location random(AffinityTopologyVersion topVer) {
- return new Location(select(nodes, 0,1,2,3), null, (byte) 0);
+ @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+ return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
}
- @Override public Location single(AffinityTopologyVersion topVer) {
- return new Location(select(nodes, 0), null, (byte) 0);
+ @Override public NodesMapping local() {
+ return new NodesMapping(select(nodes, 0), null, (byte) 0);
}
@Override public DistributionTrait distribution(int cacheId, RowType
rowType) {
return IgniteDistributions.hash(rowType.distributionKeys(),
IgniteDistributions.hashFunction());
}
- @Override public Location distributed(int cacheId,
AffinityTopologyVersion topVer) {
+ @Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
if (cacheId == CU.cacheId("Developer"))
- return new Location(null, Arrays.asList(
+ return new NodesMapping(null, Arrays.asList(
select(nodes, 0,1),
select(nodes, 1,2),
select(nodes, 2,0),
select(nodes, 0,1),
select(nodes, 1,2)
- ), Location.HAS_PARTITIONED_CACHES);
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
if (cacheId == CU.cacheId("Project"))
- return new Location(null, Arrays.asList(
+ return new NodesMapping(null, Arrays.asList(
select(nodes, 0,1),
select(nodes, 1,2),
select(nodes, 2,0),
select(nodes, 0,1),
select(nodes, 1,2)
- ), Location.HAS_PARTITIONED_CACHES);
+ ), NodesMapping.HAS_PARTITIONED_CACHES);
throw new AssertionError("Unexpected cache id:" + cacheId);
}