Repository: ignite Updated Branches: refs/heads/ignite-5272-1 [created] f6e44b6cf
ignite-5272 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6e44b6c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6e44b6c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6e44b6c Branch: refs/heads/ignite-5272-1 Commit: f6e44b6cf21c4ce3d8c4f996399ec96197a8ffc3 Parents: 70eea2a Author: sboikov <[email protected]> Authored: Tue Jun 6 13:48:51 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Jun 6 13:48:51 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 15 + .../cache/CacheAffinitySharedManager.java | 281 ++++++++++++++++--- .../processors/cache/CacheGroupContext.java | 3 + .../ClientCacheChangeDiscoveryMessage.java | 101 +++++++ .../processors/cache/GridCacheContext.java | 9 +- .../GridCachePartitionExchangeManager.java | 7 +- .../processors/cache/GridCacheProcessor.java | 135 ++++++--- .../dht/ClientCacheDhtTopologyFuture.java | 58 ++++ .../dht/GridClientPartitionTopology.java | 41 ++- .../dht/GridDhtAffinityAssignmentRequest.java | 40 ++- .../dht/GridDhtAffinityAssignmentResponse.java | 66 ++++- .../dht/GridDhtAssignmentFetchFuture.java | 19 +- .../dht/GridDhtPartitionTopology.java | 12 +- .../dht/GridDhtPartitionTopologyImpl.java | 50 ++-- .../GridDhtPartitionsExchangeFuture.java | 30 +- 15 files changed, 689 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index e144d9a..cf9abfe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -70,6 +70,8 @@ import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; @@ -2001,6 +2003,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param reqId Start request ID. + * @param reqs Cache start requests. + */ + public void clientCacheStartEvent(UUID reqId, Map<String, DynamicCacheChangeRequest> reqs) { + discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, + AffinityTopologyVersion.NONE, + localNode(), + null, + Collections.<ClusterNode>emptyList(), + new ClientCacheChangeDiscoveryMessage(reqId, reqs)); + } + + /** * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}. * * @return Start time of the first grid node. http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index d8ca177..f8ca6f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; @@ -36,13 +37,16 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -81,7 +85,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private AffinityTopologyVersion lastAffVer; /** Registered caches (updated from exchange thread). */ - private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>(); + private final CachesInfo caches = new CachesInfo(); /** */ private WaitRebalanceInfo waitInfo; @@ -131,14 +135,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. - registeredGrps.clear(); + caches.clear(); affCalcVer = null; lastAffVer = null; - for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors().values()) - registeredGrps.put(desc.groupId(), desc); + caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors()); } if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { @@ -327,17 +330,111 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param exchActions Cache change requests to execute on exchange. */ private void updateCachesInfo(ExchangeActions exchActions) { - for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) { - CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId()); + caches.updateCachesInfo(exchActions); + } + + /** + * @param msg Message. + */ + void startClientCaches(ClientCacheChangeDiscoveryMessage msg) { + assert msg.discoCache() != null : msg; + + List<DynamicCacheDescriptor> startDescs = new ArrayList<>(msg.requests().size()); + + for (DynamicCacheChangeRequest startReq : msg.requests().values()) { + DynamicCacheDescriptor desc = caches.cache(startReq.cacheName()); + + if (desc == null) { + CacheException err = new CacheException("Failed to start client cache " + + "(a cache with the given name is not started): " + startReq.cacheName()); + + cctx.cache().completeClientCacheStartFuture(msg.requestId(), err); + + return; + } + + if (cctx.cacheContext(desc.cacheId()) != null) + continue; - assert rmvd != null : stopDesc.cacheOrGroupName(); + startDescs.add(desc); } - for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) { - CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc); + AffinityTopologyVersion topVer = cctx.exchange().readyAffinityVersion(); - assert old == null : old; + Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size()); + + for (DynamicCacheDescriptor desc : startDescs) { + try { + DynamicCacheChangeRequest startReq = msg.requests().get(desc.cacheName()); + + cctx.cache().prepareCacheStart(desc, startReq.nearCacheConfiguration(), topVer); + + CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); + + assert grp != null : desc.groupId(); + assert !grp.affinityNode() : grp.cacheOrGroupName(); + + if (!grp.isLocal() && + grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) && + grp.localStartVersion().equals(topVer) && + !fetchFuts.containsKey(grp.groupId())) { + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, + grp.groupId(), + topVer, + msg.discoCache()); + + fetchFut.init(true); + + fetchFuts.put(grp.groupId(), fetchFut); + } + } + catch (IgniteCheckedException e) { + cctx.cache().completeClientCacheStartFuture(msg.requestId(), e); + + return; + } + } + + for (GridDhtAssignmentFetchFuture fetchFut : fetchFuts.values()) { + try { + CacheGroupContext grp = cctx.cache().cacheGroup(fetchFut.groupId()); + + assert grp != null; + + GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer, + null, + msg.discoCache(), + grp.affinity(), + fetchFut); + + GridDhtPartitionFullMap partMap; + + if (res != null) { + partMap = res.partitionMap(); + + assert partMap != null : res; + + ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer); + + grp.topology().updateTopologyVersion(topVer, topFut, msg.discoCache(), -1, false); + + grp.topology().update(topVer, partMap, null, null); + } + else { + // TODO 5272: mark as 'no server nodes' + } + } + catch (IgniteCheckedException e) { + // TODO 5272: stop already started caches. + cctx.cache().completeClientCacheStartFuture(msg.requestId(), e); + + return; + } } + + cctx.cache().initCacheProxies(topVer, null); + + cctx.cache().completeClientCacheStartFuture(msg.requestId(), null); } /** @@ -419,7 +516,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) { assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); - initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); + initAffinity(caches.group(grp.groupId()), grp.affinity(), fut); } } } @@ -465,7 +562,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert !grpHolder.client() : grpHolder; grpHolder = CacheGroupHolder2.create(cctx, - registeredGrps.get(grp.groupId()), + caches.group(grp.groupId()), fut, grp.affinity()); @@ -548,7 +645,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap public void removeAllCacheInfo() { grpHolders.clear(); - registeredGrps.clear(); + caches.clear(); } /** @@ -638,7 +735,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert affTopVer.topologyVersion() > 0 : affTopVer; - CacheGroupDescriptor desc = registeredGrps.get(aff.groupId()); + CacheGroupDescriptor desc = caches.group(aff.groupId()); assert desc != null : aff.cacheOrGroupName(); @@ -768,7 +865,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException { assert lateAffAssign; - for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) { + for (CacheGroupDescriptor cacheDesc : caches.allGroups()) { if (cacheDesc.config().getCacheMode() == LOCAL) continue; @@ -848,12 +945,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap public void initStartedCaches(boolean crd, final GridDhtPartitionsExchangeFuture fut, Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException { - for (DynamicCacheDescriptor desc : descs) { - CacheGroupDescriptor grpDesc = desc.groupDescriptor(); - - if (!registeredGrps.containsKey(grpDesc.groupId())) - registeredGrps.put(grpDesc.groupId(), grpDesc); - } + caches.initStartedCaches(descs); if (crd && lateAffAssign) { forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @@ -873,7 +965,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(registeredGrps.get(aff.groupId()), aff, fut); + initAffinity(caches.group(aff.groupId()), aff, fut); } }); } @@ -898,13 +990,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - desc, + desc.groupId(), fut.topologyVersion(), fut.discoCache()); - fetchFut.init(); + fetchFut.init(false); - fetchAffinity(fut, aff, fetchFut); + fetchAffinity(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache(), + aff, fetchFut); } } @@ -995,7 +1090,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap StringBuilder names = new StringBuilder(); for (Integer grpId : grpIds) { - String name = registeredGrps.get(grpId).cacheOrGroupName(); + String name = caches.group(grpId).cacheOrGroupName(); if (names.length() != 0) names.append(", "); @@ -1027,16 +1122,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grp.affinity().initialize(fut.topologyVersion(), assignment); } else { - CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId()); + CacheGroupDescriptor grpDesc = caches.group(grp.groupId()); assert grpDesc != null : grp.cacheOrGroupName(); GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - grpDesc, + grpDesc.groupId(), topVer, fut.discoCache()); - fetchFut.init(); + fetchFut.init(false); fetchFuts.add(fetchFut); } @@ -1047,48 +1142,57 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Integer grpId = fetchFut.groupId(); - fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut); + fetchAffinity(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache(), + cctx.cache().cacheGroup(grpId).affinity(), + fetchFut); } } /** - * @param fut Exchange future. + * @param topVer Topology version. + * @param discoveryEvt Discovery event. + * @param discoCache Discovery data cache. * @param affCache Affinity. * @param fetchFut Affinity fetch future. * @throws IgniteCheckedException If failed. + * @return Affinity assignment response. */ - private void fetchAffinity(GridDhtPartitionsExchangeFuture fut, + private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer, + @Nullable DiscoveryEvent discoveryEvt, + DiscoCache discoCache, GridAffinityAssignmentCache affCache, GridDhtAssignmentFetchFuture fetchFut) throws IgniteCheckedException { assert affCache != null; - AffinityTopologyVersion topVer = fut.topologyVersion(); - GridDhtAffinityAssignmentResponse res = fetchFut.get(); if (res == null) { - List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> aff = affCache.calculate(topVer, discoveryEvt, discoCache); affCache.initialize(topVer, aff); } else { - List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(cctx.discovery()); + List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache); if (idealAff != null) affCache.idealAssignment(idealAff); else { assert !affCache.centralizedAffinityFunction() || !lateAffAssign; - affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + affCache.calculate(topVer, discoveryEvt, discoCache); } - List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery()); + List<List<ClusterNode>> aff = res.affinityAssignment(discoCache); assert aff != null : res; affCache.initialize(topVer, aff); } + + return res; } /** @@ -1141,7 +1245,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (grp.isLocal()) continue; - initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); + initAffinity(caches.group(grp.groupId()), grp.affinity(), fut); } } @@ -1203,18 +1307,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev; GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - desc, + desc.groupId(), prev.topologyVersion(), prev.discoCache()); - fetchFut.init(); + fetchFut.init(false); final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>(); fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() { @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut) throws IgniteCheckedException { - fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut); + fetchAffinity(prev.topologyVersion(), + prev.discoveryEvent(), + prev.discoCache(), + aff, (GridDhtAssignmentFetchFuture)fetchFut); aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); @@ -1877,7 +1984,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cacheWaitParts == null) { waitGrps.put(grpId, cacheWaitParts = new HashMap<>()); - deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId()); + deploymentIds.put(grpId, caches.group(grpId).deploymentId()); } cacheWaitParts.put(part, waitNode); @@ -1896,4 +2003,92 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']'; } } + + /** + * + */ + static class CachesInfo { + /** Registered caches (updated from exchange thread). */ + private final ConcurrentHashMap<Integer, CacheGroupDescriptor> registeredGrps = new ConcurrentHashMap<>(); + + /** */ + private final ConcurrentHashMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); + + /** + * @param grps Registered groups. + * @param caches Registered caches. + */ + void init(Map<Integer, CacheGroupDescriptor> grps, Map<String, DynamicCacheDescriptor> caches) { + for (CacheGroupDescriptor grp : grps.values()) + registeredGrps.put(grp.groupId(), grp); + + for (DynamicCacheDescriptor cache : caches.values()) + registeredCaches.put(cache.cacheName(), cache); + } + + /** + * @return All registered groups. + */ + Collection<CacheGroupDescriptor> allGroups() { + return registeredGrps.values(); + } + + /** + * @param grpId Group ID. + * @return Group descriptor. + */ + CacheGroupDescriptor group(int grpId) { + CacheGroupDescriptor desc = registeredGrps.get(grpId); + + assert desc != null : grpId; + + return desc; + } + + /** + * @param descs Cache descriptor. + */ + void initStartedCaches(Collection<DynamicCacheDescriptor> descs) { + for (DynamicCacheDescriptor desc : descs) { + CacheGroupDescriptor grpDesc = desc.groupDescriptor(); + + if (!registeredGrps.containsKey(grpDesc.groupId())) + registeredGrps.put(grpDesc.groupId(), grpDesc); + } + } + + /** + * @param exchActions Exchange actions. + */ + void updateCachesInfo(ExchangeActions exchActions) { + for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) { + CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId()); + + assert rmvd != null : stopDesc.cacheOrGroupName(); + } + + for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) { + CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc); + + assert old == null : old; + } + } + + /** + * @param cacheName Cache name. + * @return Cache descriptor if cache found. + */ + @Nullable DynamicCacheDescriptor cache(String cacheName) { + return registeredCaches.get(cacheName); + } + + /** + * + */ + void clear() { + registeredGrps.clear(); + + registeredCaches.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index b85c41d..20793c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -919,6 +919,9 @@ public class CacheGroupContext { res.idealAffinityAssignment(assignment.idealAssignment()); } + if (req.sendPartitionsState()) + res.partitionMap(top.partitionMap(true)); + try { ctx.io().send(nodeId, res, AFFINITY_POOL); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java new file mode 100644 index 0000000..58173cc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Dummy discovery message which is not really sent via ring, it is just added in local discovery worker queue. + */ +public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage, CachePartitionExchangeWorkerTask { + /** */ + private final UUID reqId; + + /** */ + private final Map<String, DynamicCacheChangeRequest> reqs; + + /** */ + private DiscoCache discoCache; + + /** + * @param reqId Start request ID. + * @param reqs Caches start requests. + */ + public ClientCacheChangeDiscoveryMessage(UUID reqId, Map<String, DynamicCacheChangeRequest> reqs) { + assert reqId != null; + assert !F.isEmpty(reqs); + + this.reqId = reqId; + this.reqs = reqs; + } + + /** + * @return Discovery data cache. + */ + DiscoCache discoCache() { + return discoCache; + } + + /** + * @param discoCache Discovery data cache. + */ + void discoCache(DiscoCache discoCache) { + this.discoCache = discoCache; + } + + /** + * @return Start request ID. + */ + UUID requestId() { + return reqId; + } + + /** + * @return Cache start requests. + */ + Map<String, DynamicCacheChangeRequest> requests() { + return reqs; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientCacheChangeDiscoveryMessage.class, this, "caches", reqs.keySet()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 44cf4e0..d2b41d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -829,14 +829,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Partition topology. */ public GridDhtPartitionTopology topology() { - GridCacheAdapter<K, V> cache = this.cache; - - if (cache == null) - throw new IllegalStateException("Cache stopped: " + cacheName); - - assert cache.isNear() || cache.isDht() || cache.isColocated() || cache.isDhtAtomic() : cache; - - return topology(cache); + return grp.topology(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index ae4c164..3cd5b39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -266,9 +266,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, null, null, null); } else { + if (cache == null) + cache = cctx.discovery().discoCache(readyAffinityVersion()); + // Process event as custom discovery task if needed. CachePartitionExchangeWorkerTask task = - cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg); + cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg, cache); if (task != null) exchWorker.addCustomTask(task); @@ -1688,7 +1691,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.cache().processCustomExchangeTask(task); } catch (Exception e) { - U.warn(log, "Failed to process custom exchange task: " + task, e); + U.error(log, "Failed to process custom exchange task: " + task, e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 577bfb9..33d9e92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.IgniteTransactionsEx; import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; @@ -356,15 +357,24 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Create exchange worker task for custom discovery message. * * @param msg Custom discovery message. + * @param discoCache Discovery data cache. * @return Task or {@code null} if message doesn't require any special processing. */ - public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) { + public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg, + DiscoCache discoCache) { if (msg instanceof SchemaAbstractDiscoveryMessage) { SchemaAbstractDiscoveryMessage msg0 = (SchemaAbstractDiscoveryMessage)msg; if (msg0.exchange()) return new SchemaExchangeWorkerTask(msg0); } + else if (msg instanceof ClientCacheChangeDiscoveryMessage) { + ClientCacheChangeDiscoveryMessage msg0 = (ClientCacheChangeDiscoveryMessage)msg; + + msg0.discoCache(discoCache); + + return msg0; + } return null; } @@ -391,6 +401,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.query().onNodeLeave(task0.node()); } + else if (task instanceof ClientCacheChangeDiscoveryMessage) { + ClientCacheChangeDiscoveryMessage task0 = (ClientCacheChangeDiscoveryMessage)task; + + sharedCtx.affinity().startClientCaches(task0); + } else U.warn(log, "Unsupported custom exchange task: " + task); } @@ -2086,6 +2101,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { onExchangeDone(topVer, actions, err, true); } + void initCacheProxies(AffinityTopologyVersion startTopVer, Throwable err) { + for (GridCacheAdapter<?, ?> cache : caches.values()) { + GridCacheContext<?, ?> cacheCtx = cache.context(); + + if (cacheCtx.startTopologyVersion().equals(startTopVer) && !jCacheProxies.containsKey(cacheCtx.name())) { + jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + + if (cacheCtx.preloader() != null) + cacheCtx.preloader().onInitialExchangeComplete(err); + } + } + } + /** * Callback invoked when first exchange future for dynamic cache is completed. * @@ -2100,16 +2128,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { Throwable err, boolean forceClose ) { - for (GridCacheAdapter<?, ?> cache : caches.values()) { - GridCacheContext<?, ?> cacheCtx = cache.context(); - - if (cacheCtx.startTopologyVersion().equals(topVer)) { - jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); - - if (cacheCtx.preloader() != null) - cacheCtx.preloader().onInitialExchangeComplete(err); - } - } + initCacheProxies(topVer, err); if (exchActions != null && (err == null || forceClose)) { Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps = null; @@ -2234,6 +2253,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param reqId Request ID. + * @param err Error if any. + */ + void completeClientCacheStartFuture(UUID reqId, @Nullable Exception err) { + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(reqId); + + if (fut != null) + fut.onDone(false, err); + } + + /** * Creates shared context. * * @param kernalCtx Kernal context. @@ -2550,8 +2580,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { failIfExists, failIfNotStarted); - if (req != null) + if (req != null) { + if (req.clientStartOnly()) + return startClientCaches(F.asMap(req.cacheName(), req)); + return F.first(initiateCacheChanges(F.asList(req), failIfExists)); + } else return new GridFinishedFuture<>(); } @@ -2561,6 +2595,27 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param reqs Requests. + * @return Start future. + */ + private IgniteInternalFuture<Boolean> startClientCaches(Map<String, DynamicCacheChangeRequest> reqs) { + DynamicCacheStartFuture fut = new DynamicCacheStartFuture(UUID.randomUUID()); + + IgniteInternalFuture old = pendingFuts.put(fut.id, fut); + + assert old == null; + + ctx.discovery().clientCacheStartEvent(fut.id, reqs); + + IgniteCheckedException err = checkNodeState(); + + if (err != null) + fut.onDone(err); + + return fut; + } + + /** * Dynamically starts multiple caches. * * @param ccfgList Collection of cache configuration. @@ -2833,7 +2888,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size()); for (DynamicCacheChangeRequest req : reqs) { - DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req); + DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.requestId()); try { if (req.stop() || req.close()) { @@ -2890,14 +2945,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs)); - if (ctx.isStopping()) { - err = new IgniteCheckedException("Failed to execute dynamic cache change request, " + - "node is stopping."); - } - else if (ctx.clientDisconnected()) { - err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Failed to execute dynamic cache change request, client node disconnected."); - } + err = checkNodeState(); } catch (IgniteCheckedException e) { err = e; @@ -2913,6 +2961,22 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @return Non null exception if node is stopping or disconnected. + */ + @Nullable private IgniteCheckedException checkNodeState() { + if (ctx.isStopping()) { + return new IgniteCheckedException("Failed to execute dynamic cache change request, " + + "node is stopping."); + } + else if (ctx.clientDisconnected()) { + return new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to execute dynamic cache change request, client node disconnected."); + } + + return null; + } + + /** * @param type Event type. * @param node Event node. * @param topVer Topology version. @@ -3225,13 +3289,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting public cache for name: " + cacheName); - IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName); - DynamicCacheDescriptor desc = cacheDescriptor(cacheName); if (desc != null && !desc.cacheType().userCache()) throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName); + IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName); + if (cache == null) { dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get(); @@ -3800,33 +3864,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> { - /** Cache name. */ - private String cacheName; - - /** Change request. */ - @GridToStringInclude - private DynamicCacheChangeRequest req; - - /** - * @param cacheName Cache name. - * @param req Cache start request. - */ - private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) { - this.cacheName = cacheName; - this.req = req; - } + /** */ + private UUID id; /** - * @return Request. + * @param id Future ID. */ - public DynamicCacheChangeRequest request() { - return req; + private DynamicCacheStartFuture(UUID id) { + this.id = id; } /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { // Make sure to remove future before completion. - pendingFuts.remove(req.requestId(), this); + pendingFuts.remove(id, this); return super.onDone(res, err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java new file mode 100644 index 0000000..d374f29 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ClientCacheDhtTopologyFuture extends GridFinishedFuture<AffinityTopologyVersion> + implements GridDhtTopologyFuture { + /** + * @param topVer Exchange topology version. + */ + public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) { + super(topVer); + + assert topVer != null; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return result(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Throwable validateCache(GridCacheContext cctx, + boolean recovery, + boolean read, + @Nullable Object key, + @Nullable Collection<?> keys) { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ClientCacheDhtTopologyFuture [topVer=" + topologyVersion() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index cace4e8..7cfff92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -85,7 +85,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private final Map<Integer, Set<UUID>> part2node = new HashMap<>(); /** */ - private GridDhtPartitionExchangeId lastExchangeId; + private AffinityTopologyVersion lastExchangeVer; /** */ private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; @@ -185,21 +185,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void updateTopologyVersion( - GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut, + AffinityTopologyVersion exchTopVer, + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, long updSeq, boolean stopping ) throws IgniteInterruptedCheckedException { U.writeLock(lock); try { - assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + - ", exchId=" + exchId + ']'; + assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + + ", exchVer=" + exchTopVer + ']'; this.stopping = stopping; - topVer = exchId.topologyVersion(); - discoCache = exchFut.discoCache(); + topVer = exchTopVer; + this.discoCache = discoCache; updateSeq.setIfGreater(updSeq); @@ -569,23 +570,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update( - @Nullable GridDhtPartitionsExchangeFuture exchFut, + @Nullable AffinityTopologyVersion exchVer, GridDhtPartitionFullMap partMap, Map<Integer, T2<Long, Long>> cntrMap, Set<Integer> partsToReload ) { - GridDhtPartitionExchangeId exchId = exchFut != null ? exchFut.exchangeId() : null; - if (log.isDebugEnabled()) - log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + log.debug("Updating full partition map [exchVer=" + exchVer + ", parts=" + fullMapString() + ']'); lock.writeLock().lock(); try { - if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { + if (exchVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchVer) >= 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + lastExchangeVer + ", exchVer=" + exchVer + ']'); return null; } @@ -593,15 +592,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (node2part != null && node2part.compareTo(partMap) >= 0) { if (log.isDebugEnabled()) log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + lastExchangeVer + ", exchVer=" + exchVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); return null; } updateSeq.incrementAndGet(); - if (exchId != null) - lastExchangeId = exchId; + if (exchVer != null) + lastExchangeVer = exchVer; if (node2part != null) { for (GridDhtPartitionMap part : node2part.values()) { @@ -611,7 +610,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { // then we keep the newer value. if (newPart != null && newPart.updateSequence() < part.updateSequence()) { if (log.isDebugEnabled()) - log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + + log.debug("Overriding partition map in full update map [exchVer=" + exchVer + ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']'); partMap.put(part.nodeId(), part); @@ -708,16 +707,16 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (stopping) return null; - if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { + if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) { if (log.isDebugEnabled()) - log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" + + lastExchangeVer + ", exchId=" + exchId + ']'); return null; } if (exchId != null) - lastExchangeId = exchId; + lastExchangeVer = exchId.topologyVersion(); if (node2part == null) // Create invalid partition map. http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java index d9d642a..44c7b88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -32,6 +32,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { private static final long serialVersionUID = 0L; /** */ + private static final int SND_PART_STATE_MASK = 0x01; + + /** */ + private byte flags; + + /** */ private long futId; /** Topology version being queried. */ @@ -48,16 +54,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { * @param futId Future ID. * @param grpId Cache group ID. * @param topVer Topology version. + * @param sndPartMap {@code True} if need send in response cache partitions state. */ public GridDhtAffinityAssignmentRequest( long futId, int grpId, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer, + boolean sndPartMap) { assert topVer != null; this.futId = futId; this.grpId = grpId; this.topVer = topVer; + + if (sndPartMap) + flags |= SND_PART_STATE_MASK; + } + + /** + * @return {@code True} if need send in response cache partitions state. + */ + public boolean sendPartitionsState() { + return (flags & SND_PART_STATE_MASK) != 0; } /** @@ -91,7 +109,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 6; } /** {@inheritDoc} */ @@ -110,12 +128,18 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { switch (writer.state()) { case 3: - if (!writer.writeLong("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 4: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 5: if (!writer.writeMessage("topVer", topVer)) return false; @@ -138,7 +162,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { switch (reader.state()) { case 3: - futId = reader.readLong("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -146,6 +170,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { reader.incrementState(); case 4: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index 4df3fc1..5b0de08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -21,19 +21,20 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * Affinity assignment response. @@ -62,6 +63,13 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { /** Affinity assignment bytes. */ private byte[] idealAffAssignmentBytes; + /** */ + @GridDirectTransient + private GridDhtPartitionFullMap partMap; + + /** */ + private byte[] partBytes; + /** * Empty constructor. */ @@ -107,30 +115,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { } /** - * @param disco Discovery manager. + * @param discoCache Discovery data cache. * @return Affinity assignment. */ - public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) { + public List<List<ClusterNode>> affinityAssignment(DiscoCache discoCache) { if (affAssignmentIds != null) - return nodes(disco, affAssignmentIds); + return nodes(discoCache, affAssignmentIds); return null; } /** - * @param disco Discovery manager. + * @param discoCache Discovery data cache. * @return Ideal affinity assignment. */ - public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) { - return nodes(disco, idealAffAssignment); + public List<List<ClusterNode>> idealAffinityAssignment(DiscoCache discoCache) { + return nodes(discoCache, idealAffAssignment); } /** - * @param disco Discovery manager. + * @param discoCache Discovery data cache. * @param assignmentIds Assignment node IDs. * @return Assignment nodes. */ - private List<List<ClusterNode>> nodes(GridDiscoveryManager disco, List<List<UUID>> assignmentIds) { + private List<List<ClusterNode>> nodes(DiscoCache discoCache, List<List<UUID>> assignmentIds) { if (assignmentIds != null) { List<List<ClusterNode>> assignment = new ArrayList<>(assignmentIds.size()); @@ -139,7 +147,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { List<ClusterNode> nodes = new ArrayList<>(ids.size()); for (int j = 0; j < ids.size(); j++) { - ClusterNode node = disco.node(topVer, ids.get(j)); + ClusterNode node = discoCache.node(ids.get(j)); assert node != null; @@ -163,6 +171,20 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { } /** + * @param partMap Partition map. + */ + public void partitionMap(GridDhtPartitionFullMap partMap) { + this.partMap = partMap; + } + + /** + * @return Partition map. + */ + @Nullable public GridDhtPartitionFullMap partitionMap() { + return partMap; + } + + /** * @param assignments Assignment. * @return Assignment where cluster nodes are converted to their ids. */ @@ -193,7 +215,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** @@ -208,6 +230,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { if (idealAffAssignment != null && idealAffAssignmentBytes == null) idealAffAssignmentBytes = U.marshal(ctx, idealAffAssignment); + + if (partMap != null && partBytes == null) + partBytes = U.zip(U.marshal(ctx.marshaller(), partMap)); } /** {@inheritDoc} */ @@ -222,6 +247,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { if (idealAffAssignmentBytes != null && idealAffAssignment == null) idealAffAssignment = U.unmarshal(ctx, idealAffAssignmentBytes, ldr); + + if (partBytes != null && partMap == null) + partMap = U.unmarshalZip(ctx.marshaller(), partBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } /** {@inheritDoc} */ @@ -263,6 +291,12 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { writer.incrementState(); case 6: + if (!writer.writeByteArray("partBytes", partBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeMessage("topVer", topVer)) return false; @@ -309,6 +343,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { reader.incrementState(); case 6: + partBytes = reader.readByteArray("partBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 8746320..9595ba4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -76,25 +76,28 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin /** */ private final int grpId; + /** */ + private boolean needPartState; + /** * @param ctx Context. - * @param grpDesc Group descriptor. + * @param grpId Group ID. * @param topVer Topology version. * @param discoCache Discovery cache. */ public GridDhtAssignmentFetchFuture( GridCacheSharedContext ctx, - CacheGroupDescriptor grpDesc, + int grpId, AffinityTopologyVersion topVer, DiscoCache discoCache ) { this.topVer = topVer; - this.grpId = grpDesc.groupId(); + this.grpId = grpId; this.ctx = ctx; id = idGen.getAndIncrement(); - Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId()); + Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpId); LinkedList<ClusterNode> tmp = new LinkedList<>(); @@ -127,8 +130,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin /** * Initializes fetch future. + * + * @param needPartState {@code True} if also need fetch partitions state. */ - public void init() { + public void init(boolean needPartState) { + this.needPartState = needPartState; + ctx.affinity().addDhtAssignmentFetchFuture(this); requestFromNextNode(); @@ -195,7 +202,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin ", node=" + node + ']'); ctx.io().send(node, - new GridDhtAffinityAssignmentRequest(id, grpId, topVer), + new GridDhtAffinityAssignmentRequest(id, grpId, topVer, needPartState), AFFINITY_POOL); // Close window for listener notification. http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 4e0608d..d6b94de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -54,15 +55,15 @@ public interface GridDhtPartitionTopology { /** * Updates topology version. * - * @param exchId Exchange ID. * @param exchFut Exchange future. * @param updateSeq Update sequence. * @param stopping Stopping flag. * @throws IgniteInterruptedCheckedException If interrupted. */ public void updateTopologyVersion( - GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut, + AffinityTopologyVersion exchVer, + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, long updateSeq, boolean stopping ) throws IgniteInterruptedCheckedException; @@ -233,12 +234,13 @@ public interface GridDhtPartitionTopology { public void onRemoved(GridDhtCacheEntry e); /** - * @param exchFut Exchange future. + * @param exchangeVer Exchange version. * @param partMap Update partition map. * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ - public GridDhtPartitionMap update(@Nullable GridDhtPartitionsExchangeFuture exchFut, + public GridDhtPartitionMap update( + @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, T2<Long, Long>> cntrMap, Set<Integer> partsToReload); http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 19e48eb..f9d7e94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -98,7 +98,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private final Map<Integer, Set<UUID>> part2node; /** */ - private GridDhtPartitionExchangeId lastExchangeId; + private AffinityTopologyVersion lastExchangeVer; /** */ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; @@ -162,7 +162,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { part2node.clear(); - lastExchangeId = null; + lastExchangeVer = null; updateSeq.set(1); @@ -209,16 +209,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void updateTopologyVersion( - GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut, + AffinityTopologyVersion exchTopVer, + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, long updSeq, boolean stopping ) throws IgniteInterruptedCheckedException { U.writeLock(lock); try { - assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + - ", exchId=" + exchId + + assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + + ", exchTopVer=" + exchTopVer + ", fut=" + exchFut + ']'; this.stopping = stopping; @@ -229,9 +230,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = AffinityTopologyVersion.NONE; - topVer = exchId.topologyVersion(); + topVer = exchTopVer; - discoCache = exchFut.discoCache(); + this.discoCache = discoCache; } finally { lock.writeLock().unlock(); @@ -1015,15 +1016,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Override public GridDhtPartitionMap update( - @Nullable GridDhtPartitionsExchangeFuture exchFut, + @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, T2<Long, Long>> cntrMap, Set<Integer> partsToReload ) { - GridDhtPartitionExchangeId exchId = exchFut != null ? exchFut.exchangeId() : null; - if (log.isDebugEnabled()) - log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); assert partMap != null; @@ -1056,25 +1055,24 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - //if need skip - if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { + if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) { if (log.isDebugEnabled()) - log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + log.debug("Stale exchange id for full partition map update (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchangeVer + ']'); return null; } if (node2part != null && node2part.compareTo(partMap) >= 0) { if (log.isDebugEnabled()) - log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); return null; } - if (exchId != null) - lastExchangeId = exchId; + if (exchangeVer != null) + lastExchangeVer = exchangeVer; if (node2part != null) { for (GridDhtPartitionMap part : node2part.values()) { @@ -1087,8 +1085,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0)) ) { if (log.isDebugEnabled()) - log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + - mapString(part) + ", newPart=" + mapString(newPart) + ']'); + log.debug("Overriding partition map in full update map [exch=" + exchangeVer + + ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']'); partMap.put(part.nodeId(), part); } @@ -1284,16 +1282,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; - if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { + if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) { if (log.isDebugEnabled()) - log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + log.debug("Stale exchange id for single partition map update (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchId.topologyVersion() + ']'); return null; } if (exchId != null) - lastExchangeId = exchId; + lastExchangeVer = exchId.topologyVersion(); if (node2part == null) // Create invalid partition map. http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e44b6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 88dc863..6679b83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -760,14 +760,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) - top.update(this, clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.<Integer>emptySet()); + top.update(topologyVersion(), clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.<Integer>emptySet()); } - top.updateTopologyVersion(exchId, this, updSeq, cacheGroupStopping(grp.groupId())); + top.updateTopologyVersion(topologyVersion(), + this, + discoCache(), + updSeq, + cacheGroupStopping(grp.groupId())); } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId())); + top.updateTopologyVersion(exchId.topologyVersion(), this, discoCache(), -1, cacheGroupStopping(top.groupId())); } /** @@ -872,7 +876,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert fullMap != null; - grp.topology().update(this, + grp.topology().update(topologyVersion(), fullMap, top.updateCounters(false), Collections.<Integer>emptySet()); @@ -2103,14 +2107,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - if (grp != null) - grp.topology().update(this, entry.getValue(), cntrMap, - msg.partsToReload(cctx.localNodeId(), grpId)); + if (grp != null) { + grp.topology().update(topologyVersion(), + entry.getValue(), + cntrMap, + msg.partsToReload(cctx.localNodeId(), + grpId)); + } else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); - if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(grpId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet()); + if (oldest != null && oldest.isLocal()) { + cctx.exchange().clientTopology(grpId, this).update(topologyVersion(), + entry.getValue(), + cntrMap, + Collections.<Integer>emptySet()); + } } } }
