Repository: ignite Updated Branches: refs/heads/ignite-5075-cacheStart 861b34b29 -> d24b08b65
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d24b08b6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d24b08b6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d24b08b6 Branch: refs/heads/ignite-5075-cacheStart Commit: d24b08b65aeee089894b62fa8619d0c24cbae23b Parents: 861b34b Author: sboikov <[email protected]> Authored: Thu May 11 16:18:32 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 11 16:18:32 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 33 +++++++------ .../internal/processors/cache/CacheData.java | 14 ------ .../processors/cache/ClusterCachesInfo.java | 51 ++++++++------------ .../cache/DynamicCacheDescriptor.java | 31 +++--------- .../processors/cache/GridCacheContext.java | 15 ------ .../processors/cache/GridCacheIoManager.java | 6 ++- .../GridCachePartitionExchangeManager.java | 4 +- .../processors/cache/GridCacheProcessor.java | 9 ---- .../dht/GridDhtAffinityAssignmentRequest.java | 32 ++++++------ .../dht/GridDhtAffinityAssignmentResponse.java | 36 ++++++++++++-- .../dht/GridDhtAssignmentFetchFuture.java | 50 ++++++++++--------- .../dht/GridDhtPartitionTopologyImpl.java | 4 +- .../dht/preloader/GridDhtPreloader.java | 4 +- .../continuous/CacheContinuousQueryManager.java | 1 - .../loadtests/hashmap/GridCacheTestContext.java | 1 - 15 files changed, 131 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 8c275e0..bd41ccc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -91,8 +91,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private final Object mux = new Object(); /** Pending affinity assignment futures. */ - private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture> - pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); + private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = + new ConcurrentHashMap8<>(); /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @@ -118,6 +118,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param cacheId Cache ID. + * @return Cache start topology version. + */ + public AffinityTopologyVersion localStartVersion(int cacheId) { + DynamicCacheDescriptor desc = registeredCaches.get(cacheId); + + return desc != null ? desc.localStartVersion() : null; + } + + /** * Callback invoked from discovery thread when discovery message is received. * * @param type Event type. @@ -414,8 +424,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (clientCacheStarted) initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign); else if (!req.clientStartOnly()) { - assert fut.topologyVersion().equals(cacheCtx.cacheStartTopologyVersion()); - GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache(); assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion(); @@ -696,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param fut Future to add. */ public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) { - GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut); + GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut); assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut + ", allFuts=" + pendingAssignmentFetchFuts + ']'; @@ -706,9 +714,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param fut Future to remove. */ public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) { - boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut); + boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut); - assert rmv : "Failed to remove assignment fetch future: " + fut.key(); + assert rmv : "Failed to remove assignment fetch future: " + fut.id(); } /** @@ -720,13 +728,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (log.isDebugEnabled()) log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']'); - for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) { - if (fut.key().get1().equals(cacheId)) { - fut.onResponse(nodeId, res); + GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId()); - break; - } - } + if (fut != null) + fut.onResponse(nodeId, res); } /** @@ -1001,7 +1006,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (int i = 0; i < fetchFuts.size(); i++) { GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i); - Integer cacheId = fetchFut.key().get1(); + Integer cacheId = fetchFut.cacheId(); fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java index 82afdc7..0c97ab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java @@ -42,9 +42,6 @@ public class CacheData implements Serializable { private final CacheType cacheType; /** */ - private final AffinityTopologyVersion startTopVer; - - /** */ private final IgniteUuid deploymentId; /** */ @@ -66,7 +63,6 @@ public class CacheData implements Serializable { * @param cacheCfg Cache configuration. * @param cacheId Cache ID. * @param cacheType Cache ID. - * @param startTopVer Topology version when cache was started. * @param deploymentId Cache deployment ID. * @param schema Query schema. * @param rcvdFrom Node ID cache was started from. @@ -77,7 +73,6 @@ public class CacheData implements Serializable { CacheData(CacheConfiguration cacheCfg, int cacheId, CacheType cacheType, - AffinityTopologyVersion startTopVer, IgniteUuid deploymentId, QuerySchema schema, UUID rcvdFrom, @@ -86,14 +81,12 @@ public class CacheData implements Serializable { byte flags) { assert cacheCfg != null; assert rcvdFrom != null : cacheCfg.getName(); - assert startTopVer != null : cacheCfg.getName(); assert deploymentId != null : cacheCfg.getName(); assert template || cacheId != 0 : cacheCfg.getName(); this.cacheCfg = cacheCfg; this.cacheId = cacheId; this.cacheType = cacheType; - this.startTopVer = startTopVer; this.deploymentId = deploymentId; this.schema = schema; this.rcvdFrom = rcvdFrom; @@ -110,13 +103,6 @@ public class CacheData implements Serializable { } /** - * @return Start topology version. - */ - public AffinityTopologyVersion startTopologyVersion() { - return startTopVer; - } - - /** * @return {@code True} if this is template configuration. */ public boolean template() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 44d41f8..efcf6a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -82,9 +82,6 @@ class ClusterCachesInfo { private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches; /** */ - private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>(); - - /** */ private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs; /** @@ -340,19 +337,15 @@ class ClusterCachesInfo { if (needExchange) { req.clientStartOnly(true); - desc.clientCacheStartVersion(topVer.nextMinorVersion()); + desc.localStartVersion(topVer.nextMinorVersion()); exchangeActions.addClientCacheToStart(req, desc); } } if (!needExchange) { - if (desc != null) { - if (desc.clientCacheStartVersion() != null) - waitTopVer = desc.clientCacheStartVersion(); - else - waitTopVer = desc.startTopologyVersion(); - } + if (desc != null) + waitTopVer = desc.localStartVersion(); } } else if (req.globalStateChange()) @@ -404,7 +397,7 @@ class ClusterCachesInfo { for (DynamicCacheDescriptor desc : addedDescs) { assert desc.template() || incMinorTopVer; - desc.startTopologyVersion(startTopVer); + desc.localStartVersion(startTopVer); } } @@ -545,9 +538,11 @@ class ClusterCachesInfo { locJoinStartCaches = new ArrayList<>(); if (!disconnectedState() && joinDiscoData != null) { - processJoiningNode(joinDiscoData, node.id(), topVer); + processJoiningNode(joinDiscoData, node.id()); for (DynamicCacheDescriptor desc : registeredCaches.values()) { + desc.localStartVersion(topVer); + CacheConfiguration cfg = desc.cacheConfiguration(); CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName()); @@ -564,8 +559,7 @@ class ClusterCachesInfo { desc.deploymentId(), desc.schema()); - desc0.startTopologyVersion(desc.startTopologyVersion()); - desc0.clientCacheStartVersion(desc.clientCacheStartVersion()); + desc0.localStartVersion(desc.localStartVersion()); desc0.receivedFrom(desc.receivedFrom()); desc0.staticallyConfigured(desc.staticallyConfigured()); @@ -577,11 +571,15 @@ class ClusterCachesInfo { } } } - else { - CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id()); - if (discoData != null) - processJoiningNode(discoData, node.id(), topVer); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (node.id().equals(desc.receivedFrom())) + desc.localStartVersion(topVer); + } + + for (DynamicCacheDescriptor desc : registeredTemplates.values()) { + if (node.id().equals(desc.receivedFrom())) + desc.localStartVersion(topVer); } } } @@ -607,7 +605,6 @@ class ClusterCachesInfo { CacheData cacheData = new CacheData(desc.cacheConfiguration(), desc.cacheId(), desc.cacheType(), - desc.startTopologyVersion(), desc.deploymentId(), desc.schema(), desc.receivedFrom(), @@ -624,7 +621,6 @@ class ClusterCachesInfo { CacheData cacheData = new CacheData(desc.cacheConfiguration(), 0, desc.cacheType(), - desc.startTopologyVersion(), desc.deploymentId(), desc.schema(), desc.receivedFrom(), @@ -659,7 +655,6 @@ class ClusterCachesInfo { cacheData.deploymentId(), cacheData.schema()); - desc.startTopologyVersion(cacheData.startTopologyVersion()); desc.receivedFrom(cacheData.receivedFrom()); desc.staticallyConfigured(cacheData.staticallyConfigured()); @@ -679,7 +674,6 @@ class ClusterCachesInfo { cacheData.deploymentId(), cacheData.schema()); - desc.startTopologyVersion(cacheData.startTopologyVersion()); desc.receivedFrom(cacheData.receivedFrom()); desc.staticallyConfigured(cacheData.staticallyConfigured()); @@ -723,12 +717,8 @@ class ClusterCachesInfo { else processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId()); } - else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) { - CacheJoinNodeDiscoveryData old = - joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData); - - assert old == null : old; - } + else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) + processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId()); } } @@ -754,9 +744,8 @@ class ClusterCachesInfo { /** * @param joinData Joined node discovery data. * @param nodeId Joined node ID. - * @param topVer Topology version. */ - private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) { + private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) { for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { CacheConfiguration cfg = cacheInfo.config(); @@ -770,7 +759,6 @@ class ClusterCachesInfo { desc.staticallyConfigured(true); desc.receivedFrom(nodeId); - desc.startTopologyVersion(topVer); DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc); @@ -791,7 +779,6 @@ class ClusterCachesInfo { desc.staticallyConfigured(true); desc.receivedFrom(nodeId); - desc.startTopologyVersion(topVer); DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc); http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index fe859f8..130ebde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -63,9 +63,6 @@ public class DynamicCacheDescriptor { private boolean updatesAllowed = true; /** */ - private AffinityTopologyVersion startTopVer; - - /** */ private Integer cacheId; /** */ @@ -78,7 +75,7 @@ public class DynamicCacheDescriptor { private volatile CacheObjectContext objCtx; /** */ - private transient AffinityTopologyVersion clientCacheStartVer; + private volatile transient AffinityTopologyVersion locStartVer; /** Mutex to control schema. */ private final Object schemaMux = new Object(); @@ -131,22 +128,6 @@ public class DynamicCacheDescriptor { } /** - * @return Start topology version. - */ - @Nullable public AffinityTopologyVersion startTopologyVersion() { - return startTopVer; - } - - /** - * @param startTopVer Start topology version. - */ - public void startTopologyVersion(AffinityTopologyVersion startTopVer) { - assert startTopVer != null; - - this.startTopVer = startTopVer; - } - - /** * @return {@code True} if this is template configuration. */ public boolean template() { @@ -253,15 +234,15 @@ public class DynamicCacheDescriptor { /** * @return Version when client cache on local node was started. */ - @Nullable AffinityTopologyVersion clientCacheStartVersion() { - return clientCacheStartVer; + @Nullable AffinityTopologyVersion localStartVersion() { + return locStartVer; } /** - * @param clientCacheStartVer Version when client cache on local node was started. + * @param locStartVer Version when cache on local node was started. */ - public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) { - this.clientCacheStartVer = clientCacheStartVer; + public void localStartVersion(AffinityTopologyVersion locStartVer) { + this.locStartVer = locStartVer; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 8f0d842..2466a59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -237,9 +237,6 @@ public class GridCacheContext<K, V> implements Externalizable { /** Topology version when cache was started on local node. */ private AffinityTopologyVersion locStartTopVer; - /** Global cache start topology version. */ - private AffinityTopologyVersion cacheStartTopVer; - /** Dynamic cache deployment ID. */ private IgniteUuid dynamicDeploymentId; @@ -292,7 +289,6 @@ public class GridCacheContext<K, V> implements Externalizable { GridCacheSharedContext sharedCtx, CacheConfiguration cacheCfg, CacheType cacheType, - AffinityTopologyVersion cacheStartTopVer, AffinityTopologyVersion locStartTopVer, boolean affNode, boolean updatesAllowed, @@ -321,7 +317,6 @@ public class GridCacheContext<K, V> implements Externalizable { assert ctx != null; assert sharedCtx != null; assert cacheCfg != null; - assert cacheStartTopVer != null : cacheCfg.getName(); assert locStartTopVer != null : cacheCfg.getName(); assert evtMgr != null; @@ -341,7 +336,6 @@ public class GridCacheContext<K, V> implements Externalizable { this.cacheCfg = cacheCfg; this.cacheType = cacheType; this.locStartTopVer = locStartTopVer; - this.cacheStartTopVer = cacheStartTopVer; this.affNode = affNode; this.updatesAllowed = updatesAllowed; this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg); @@ -470,15 +464,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return Cache start topology version. - */ - public AffinityTopologyVersion cacheStartTopologyVersion() { - assert cacheStartTopVer != null : name(); - - return cacheStartTopVer; - } - - /** * @return Cache default {@link ExpiryPolicy}. */ @Nullable public ExpiryPolicy expiry() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 5e7e401..348d9d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -146,8 +146,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) { assert cacheMsg.topologyVersion() != null : cacheMsg; - AffinityTopologyVersion waitVer = - ((GridDhtAffinityAssignmentRequest)cacheMsg).waitTopologyVersion(); + AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId()); + + if (waitVer == null) + waitVer = new AffinityTopologyVersion(cctx.localNode().order()); // Need to wait for exchange to avoid race between cache start and affinity request. fut = cctx.exchange().affinityReadyFuture(waitVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 8f52ae6..04c647f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -895,7 +895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean ready; if (exchId != null) { - AffinityTopologyVersion startTopVer = cacheCtx.cacheStartTopologyVersion(); + AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0; } @@ -1301,7 +1301,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); if (cacheCtx != null && - cacheCtx.cacheStartTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0) + cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0) continue; GridDhtPartitionTopology top = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 82db451..f9b015d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1345,7 +1345,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, @Nullable CachePluginManager pluginMgr, CacheType cacheType, - AffinityTopologyVersion cacheStartTopVer, AffinityTopologyVersion locStartTopVer, CacheObjectContext cacheObjCtx, boolean affNode, @@ -1420,7 +1419,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx, cfg, cacheType, - cacheStartTopVer, locStartTopVer, affNode, updatesAllowed, @@ -1553,7 +1551,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx, cfg, cacheType, - cacheStartTopVer, locStartTopVer, affNode, true, @@ -1733,7 +1730,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { nearCfg, cacheDesc.cacheType(), cacheDesc.deploymentId(), - cacheDesc.startTopologyVersion(), exchTopVer, cacheDesc.schema() ); @@ -1755,7 +1751,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { t.get2(), desc.cacheType(), desc.deploymentId(), - desc.startTopologyVersion(), exchTopVer, desc.schema() ); @@ -1785,7 +1780,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { null, desc.cacheType(), desc.deploymentId(), - desc.startTopologyVersion(), exchTopVer, desc.schema() ); @@ -1801,7 +1795,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param reqNearCfg Near configuration if specified for client cache start request. * @param cacheType Cache type. * @param deploymentId Deployment ID. - * @param cacheStartTopVer Cache start topology version. * @param exchTopVer Current exchange version. * @param schema Query schema. * @throws IgniteCheckedException If failed. @@ -1811,7 +1804,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { @Nullable NearCacheConfiguration reqNearCfg, CacheType cacheType, IgniteUuid deploymentId, - AffinityTopologyVersion cacheStartTopVer, AffinityTopologyVersion exchTopVer, @Nullable QuerySchema schema ) throws IgniteCheckedException { @@ -1839,7 +1831,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, - cacheStartTopVer, exchTopVer, cacheObjCtx, affNode, http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java index 0b3080e..f80adc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -31,12 +31,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { /** */ private static final long serialVersionUID = 0L; + /** */ + private long futId; + /** Topology version being queried. */ private AffinityTopologyVersion topVer; - /** */ - private AffinityTopologyVersion waitTopVer; - /** * Empty constructor. */ @@ -45,26 +45,26 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { } /** + * @param futId Future ID. * @param cacheId Cache ID. * @param topVer Topology version. - * @param waitTopVer Topology version to wait for before message processing. */ - public GridDhtAffinityAssignmentRequest(int cacheId, - AffinityTopologyVersion topVer, - AffinityTopologyVersion waitTopVer) { + public GridDhtAffinityAssignmentRequest( + long futId, + int cacheId, + AffinityTopologyVersion topVer) { assert topVer != null; - assert waitTopVer != null; + this.futId = futId; this.cacheId = cacheId; this.topVer = topVer; - this.waitTopVer = waitTopVer; } /** - * @return Topology version to wait for before message processing. + * @return Future ID. */ - public AffinityTopologyVersion waitTopologyVersion() { - return waitTopVer; + public long futureId() { + return futId; } /** {@inheritDoc} */ @@ -110,13 +110,13 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { switch (writer.state()) { case 3: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); case 4: - if (!writer.writeMessage("waitTopVer", waitTopVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); @@ -138,7 +138,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { switch (reader.state()) { case 3: - topVer = reader.readMessage("topVer"); + futId = reader.readLong("futId"); if (!reader.isLastRead()) return false; @@ -146,7 +146,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { reader.incrementState(); case 4: - waitTopVer = reader.readMessage("waitTopVer"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index e8094e1..5d82171 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** */ private static final long serialVersionUID = 0L; + /** */ + private long futId; + /** Topology version. */ private AffinityTopologyVersion topVer; @@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { } /** + * @param futId Future ID. * @param cacheId Cache ID. * @param topVer Topology version. * @param affAssignment Affinity assignment. */ - public GridDhtAffinityAssignmentResponse(int cacheId, + public GridDhtAffinityAssignmentResponse( + long futId, + int cacheId, @NotNull AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { + this.futId = futId; this.cacheId = cacheId; this.topVer = topVer; affAssignmentIds = ids(affAssignment); } + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + /** {@inheritDoc} */ @Override public boolean partitionExchangeMessage() { return true; @@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } /** @@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { writer.incrementState(); case 4: - if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes)) + if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); case 5: + if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeMessage("topVer", topVer)) return false; @@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { reader.incrementState(); case 4: - idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes"); + futId = reader.readLong("futId"); if (!reader.isLastRead()) return false; @@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { reader.incrementState(); case 5: + idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 1d6563e..741ca5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -36,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -55,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin private static IgniteLogger log; /** */ + private static final AtomicLong idGen = new AtomicLong(); + + /** */ private final GridCacheSharedContext ctx; /** List of available nodes this future can fetch data from. */ @@ -65,11 +68,13 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin private ClusterNode pendingNode; /** */ - @GridToStringInclude - private final T2<Integer, AffinityTopologyVersion> key; + private final long id; /** */ - private final DynamicCacheDescriptor cacheDesc; + private final AffinityTopologyVersion topVer; + + /** */ + private final int cacheId; /** * @param ctx Context. @@ -83,9 +88,11 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin AffinityTopologyVersion topVer, DiscoCache discoCache ) { + this.topVer = topVer; + this.cacheId = cacheDesc.cacheId(); this.ctx = ctx; - this.cacheDesc = cacheDesc; - this.key = new T2<>(cacheDesc.cacheId(), topVer); + + id = idGen.getAndIncrement(); Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId()); @@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin } /** - * Initializes fetch future. + * @return Cache ID. */ - public void init() { - ctx.affinity().addDhtAssignmentFetchFuture(this); + public int cacheId() { + return cacheId; + } - requestFromNextNode(); + /** + * @return Future ID. + */ + public long id() { + return id; } /** - * @return Future key. + * Initializes fetch future. */ - public T2<Integer, AffinityTopologyVersion> key() { - return key; + public void init() { + ctx.affinity().addDhtAssignmentFetchFuture(this); + + requestFromNextNode(); } /** @@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin * @param res Response. */ public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - if (!res.topologyVersion().equals(key.get2())) { - if (log.isDebugEnabled()) - log.debug("Received affinity assignment for wrong topology version (will ignore) " + - "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']'); - - return; - } - GridDhtAffinityAssignmentResponse res0 = null; synchronized (this) { @@ -189,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin ", node=" + node + ']'); ctx.io().send(node, - new GridDhtAffinityAssignmentRequest(key.get1(), key.get2(), cacheDesc.startTopologyVersion()), + new GridDhtAffinityAssignmentRequest(id, cacheId, topVer), AFFINITY_POOL); // Close window for listener notification. http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 5b3dfc6..58ad600 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { int num = cctx.affinity().partitions(); if (cctx.rebalanceEnabled()) { - boolean added = exchId.topologyVersion().equals(cctx.cacheStartTopologyVersion()); + boolean added = exchId.topologyVersion().equals(cctx.startTopologyVersion()); boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added; @@ -1157,7 +1157,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // then we keep the newer value. if (newPart != null && (newPart.updateSequence() < part.updateSequence() || - (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0)) + (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0)) ) { if (log.isDebugEnabled()) log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 9f1b96e..57616ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { AffinityAssignment assignment = cctx.affinity().assignment(topVer); - GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(), + GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse( + req.futureId(), + cctx.cacheId(), topVer, assignment.assignment()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 9fe29ef..acf351f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -345,7 +345,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() + ", locStart=" + cctx.startTopologyVersion() + - ", cacheStart=" + cctx.cacheStartTopologyVersion() + ", locNode=" + cctx.localNode() + ", stopping=" + cctx.kernalContext().isStopping(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d24b08b6/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 6149586..4f0d9a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -80,7 +80,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { defaultCacheConfiguration(), CacheType.USER, AffinityTopologyVersion.ZERO, - AffinityTopologyVersion.ZERO, true, true, null,
