ignite-5272 Do not use synchronous custom discovery event for client cache start/close
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4026ddcc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4026ddcc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4026ddcc Branch: refs/heads/master Commit: 4026ddcc2161ca4bf5ad3dc04cabd284c608738a Parents: 3fd9e04 Author: sboikov <[email protected]> Authored: Thu Jun 15 09:23:36 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 15 09:23:36 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 12 + .../internal/managers/discovery/DiscoCache.java | 17 - .../discovery/GridDiscoveryManager.java | 88 ++- .../affinity/GridAffinityProcessor.java | 24 +- .../cache/CacheAffinitySharedManager.java | 612 +++++++++++++++---- .../processors/cache/CacheGroupContext.java | 3 + .../ClientCacheChangeDiscoveryMessage.java | 176 ++++++ .../ClientCacheChangeDummyDiscoveryMessage.java | 104 ++++ .../cache/ClientCacheUpdateTimeout.java | 44 ++ .../processors/cache/ClusterCachesInfo.java | 94 ++- .../cache/DynamicCacheChangeRequest.java | 30 - .../processors/cache/ExchangeActions.java | 87 +-- .../processors/cache/GridCacheContext.java | 9 +- .../processors/cache/GridCacheIoManager.java | 213 +++++-- .../GridCachePartitionExchangeManager.java | 11 +- .../processors/cache/GridCacheProcessor.java | 364 ++++++----- .../dht/ClientCacheDhtTopologyFuture.java | 78 +++ .../dht/GridClientPartitionTopology.java | 41 +- .../dht/GridDhtAffinityAssignmentRequest.java | 40 +- .../dht/GridDhtAffinityAssignmentResponse.java | 66 +- .../dht/GridDhtAssignmentFetchFuture.java | 22 +- .../distributed/dht/GridDhtCacheEntry.java | 10 +- .../dht/GridDhtPartitionTopology.java | 14 +- .../dht/GridDhtPartitionTopologyImpl.java | 49 +- .../dht/GridDhtTopologyFutureAdapter.java | 195 ++++++ .../dht/GridDhtTxPrepareRequest.java | 43 +- .../dht/GridDhtTxPrepareResponse.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 14 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 11 +- .../GridDhtPartitionsExchangeFuture.java | 201 ++---- .../cache/transactions/IgniteTxEntry.java | 7 + .../cache/transactions/IgniteTxHandler.java | 13 +- .../cache/transactions/IgniteTxManager.java | 27 + .../GridProjectionForCachesSelfTest.java | 24 +- .../IgniteClientReconnectCacheTest.java | 29 +- .../cache/CacheStopAndDestroySelfTest.java | 38 +- .../cache/GridCacheClearSelfTest.java | 15 +- ...IgniteClientCacheInitializationFailTest.java | 6 +- .../IgniteClientCacheStartFailoverTest.java | 585 ++++++++++++++++++ .../IgniteDynamicClientCacheStartSelfTest.java | 248 +++++++- .../cache/IgniteNearClientCacheCloseTest.java | 262 ++++++++ ...gniteTopologyValidatorAbstractCacheTest.java | 178 ++++-- ...ologyValidatorAbstractTxCacheGroupsTest.java | 12 +- ...iteTopologyValidatorAbstractTxCacheTest.java | 28 +- .../CacheLateAffinityAssignmentTest.java | 16 +- ...teCacheClientNodePartitionsExchangeTest.java | 23 +- .../testframework/junits/GridAbstractTest.java | 2 + .../testsuites/IgniteCacheTestSuite2.java | 4 + 48 files changed, 3270 insertions(+), 925 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 539f288..d9c112c 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Properties; import javax.net.ssl.HostnameVerifier; +import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -623,6 +624,17 @@ public final class IgniteSystemProperties { public static final String IGNITE_SECURITY_COMPATIBILITY_MODE = "IGNITE_SECURITY_COMPATIBILITY_MODE"; /** + * When client cache is started or closed special discovery message is sent to notify cluster (for example this is + * needed for {@link ClusterGroup#forCacheNodes(String)} API. This timeout specifies how long to wait + * after client cache start/close before sending this message. If during this timeout another client + * cache changed, these events are combined into single message. + * <p> + * Default is 10 seconds. + */ + public static final String IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT = + "IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 22c2d07..2b3c4fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -74,10 +74,6 @@ public class DiscoCache { /** Node map. */ private final Map<UUID, ClusterNode> nodeMap; - /** Caches where at least one node has near cache enabled. */ - @GridToStringInclude - private final Set<Integer> nearEnabledCaches; - /** Alive nodes. */ private final Set<UUID> alives = new GridConcurrentHashSet<>(); @@ -93,7 +89,6 @@ public class DiscoCache { * @param allCacheNodes Cache nodes by cache name. * @param cacheGrpAffNodes Affinity nodes by cache group ID. * @param nodeMap Node map. - * @param nearEnabledCaches Caches where at least one node has near cache enabled. * @param alives Alive nodes. */ DiscoCache(ClusterNode loc, @@ -107,7 +102,6 @@ public class DiscoCache { Map<Integer, List<ClusterNode>> allCacheNodes, Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Map<UUID, ClusterNode> nodeMap, - Set<Integer> nearEnabledCaches, Set<UUID> alives) { this.loc = loc; this.rmtNodes = rmtNodes; @@ -120,7 +114,6 @@ public class DiscoCache { this.allCacheNodes = allCacheNodes; this.cacheGrpAffNodes = cacheGrpAffNodes; this.nodeMap = nodeMap; - this.nearEnabledCaches = nearEnabledCaches; this.alives.addAll(alives); } @@ -243,16 +236,6 @@ public class DiscoCache { } /** - * Checks if cache with given ID has at least one node with near cache enabled. - * - * @param cacheId Cache ID. - * @return {@code True} if cache with given name has at least one node with near cache enabled. - */ - public boolean hasNearCache(int cacheId) { - return nearEnabledCaches.contains(cacheId); - } - - /** * @param id Node ID. * @return Node. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index c91ff74..0f0d1c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -71,8 +71,11 @@ import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -357,11 +360,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Called from discovery thread. Adds dynamic cache filter. * + * @param cacheId Cache ID. * @param grpId Cache group ID. * @param cacheName Cache name. * @param nearEnabled Near enabled flag. */ public void setCacheFilter( + int cacheId, int grpId, String cacheName, boolean nearEnabled @@ -374,7 +379,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (grp.cacheMode == CacheMode.REPLICATED) nearEnabled = false; - registeredCaches.put(cacheName, new CachePredicate(grp, nearEnabled)); + registeredCaches.put(cacheName, new CachePredicate(cacheId, grp, nearEnabled)); } } @@ -586,8 +591,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert customMsg != null; boolean incMinorTopVer = ctx.cache().onCustomEvent( - customMsg, new AffinityTopologyVersion(topVer, minorTopVer) - ); + customMsg, + new AffinityTopologyVersion(topVer, minorTopVer), + node); if (incMinorTopVer) { minorTopVer++; @@ -1860,17 +1866,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Checks if cache with given ID has at least one node with near cache enabled. - * - * @param cacheId Cache ID. - * @param topVer Topology version. - * @return {@code True} if cache with given name has at least one node with near cache enabled. - */ - public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheId, topVer).hasNearCache(cacheId); - } - - /** * Gets discovery cache for given topology version. * * @param grpId Cache group ID (participates in exception message). @@ -1994,6 +1989,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param reqId Start request ID. + * @param startReqs Cache start requests. + * @param cachesToClose Cache to close. + */ + public void clientCacheStartEvent(UUID reqId, + @Nullable Map<String, DynamicCacheChangeRequest> startReqs, + @Nullable Set<String> cachesToClose) { + discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, + AffinityTopologyVersion.NONE, + localNode(), + null, + Collections.<ClusterNode>emptyList(), + new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose)); + } + + /** * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}. * * @return Start time of the first grid node. @@ -2109,8 +2120,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); - Set<Integer> nearEnabledCaches = new HashSet<>(); - for (ClusterNode node : allNodes) { assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; assert !node.isDaemon(); @@ -2143,9 +2152,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { rmtNodesWithCaches.add(node); addToMap(allCacheNodes, cacheName, node); - - if (filter.nearNode(node)) - nearEnabledCaches.add(CU.cacheId(cacheName)); } } } @@ -2162,7 +2168,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Collections.unmodifiableMap(allCacheNodes), Collections.unmodifiableMap(cacheGrpAffNodes), Collections.unmodifiableMap(nodeMap), - Collections.unmodifiableSet(nearEnabledCaches), alives); } @@ -2808,23 +2813,32 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Cache predicate. */ - private static class CachePredicate { + private class CachePredicate { + /** */ + private final int cacheId; + /** Cache filter. */ private final CacheGroupAffinity aff; /** If near cache is enabled on data nodes. */ private final boolean nearEnabled; - /** Collection of client near nodes. */ + /** + * Collection of client nodes. + * + * Note: if client cache started/closed this map is updated asynchronously. + */ private final ConcurrentHashMap<UUID, Boolean> clientNodes; /** + * @param cacheId Cache ID. * @param aff Cache group affinity. * @param nearEnabled Near enabled flag. */ - private CachePredicate(CacheGroupAffinity aff, boolean nearEnabled) { + private CachePredicate(int cacheId, CacheGroupAffinity aff, boolean nearEnabled) { assert aff != null; + this.cacheId = cacheId; this.aff = aff; this.nearEnabled = nearEnabled; @@ -2836,7 +2850,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param nearEnabled Near enabled flag. * @return {@code True} if new node ID was added. */ - public boolean addClientNode(UUID nodeId, boolean nearEnabled) { + boolean addClientNode(UUID nodeId, boolean nearEnabled) { assert nodeId != null; Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled); @@ -2868,19 +2882,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param node Node to check. * @return {@code True} if cache is accessible on the given node. */ - public boolean cacheNode(ClusterNode node) { - return !node.isDaemon() && (CU.affinityNode(node, aff.cacheFilter) || clientNodes.containsKey(node.id())); + boolean cacheNode(ClusterNode node) { + return !node.isDaemon() && (CU.affinityNode(node, aff.cacheFilter) || + cacheClientNode(node) != null); } /** * @param node Node to check. * @return {@code True} if near cache is present on the given nodes. */ - public boolean nearNode(ClusterNode node) { + boolean nearNode(ClusterNode node) { if (CU.affinityNode(node, aff.cacheFilter)) return nearEnabled; - Boolean near = clientNodes.get(node.id()); + Boolean near = cacheClientNode(node); return near != null && near; } @@ -2893,9 +2908,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (node.isDaemon()) return false; - Boolean near = clientNodes.get(node.id()); + Boolean near = cacheClientNode(node); return near != null && !near; } + + /** + * @param node Node. + * @return {@code Null} if client cache does not exist, otherwise cache near enabled flag. + */ + private Boolean cacheClientNode(ClusterNode node) { + // On local node check actual cache state since clientNodes map is updated asynchronously. + if (ctx.localNodeId().equals(node.id())) { + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); + + return cctx != null ? CU.isNearEnabled(cctx) : null; + } + + return clientNodes.get(node.id()); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 87c424a..6b43fc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -39,6 +39,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -364,18 +365,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter { if (fut != null) return fut.get(); - ClusterNode loc = ctx.discovery().localNode(); - - // Check local node. - Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer); - - if (cacheNodes.contains(loc)) { - GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); - - // Cache is being stopped. - if (cache == null) - return null; + GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); + if (cache != null) { GridCacheContext<Object, Object> cctx = cache.context(); cctx.awaitStarted(); @@ -412,6 +404,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } } + Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer); + if (F.isEmpty(cacheNodes)) return null; @@ -443,7 +437,13 @@ public class GridAffinityProcessor extends GridProcessorAdapter { CacheMode mode = ctx.cache().cacheMode(cacheName); - assert mode != null; + if (mode == null) { + if (ctx.clientDisconnected()) + throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to get affinity mapping, client disconnected."); + + throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName); + } // Map all keys to a single node, if the cache mode is LOCAL. if (mode == LOCAL) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index b6fe90b..0f62db2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -28,7 +29,9 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -36,13 +39,18 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -68,6 +76,10 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; */ @SuppressWarnings("ForLoopReplaceableByForEach") public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { + /** */ + private final long clientCacheMsgTimeout = + IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000); + /** Late affinity assignment flag. */ private boolean lateAffAssign; @@ -81,7 +93,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private AffinityTopologyVersion lastAffVer; /** Registered caches (updated from exchange thread). */ - private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>(); + private final CachesInfo caches = new CachesInfo(); /** */ private WaitRebalanceInfo waitInfo; @@ -93,6 +105,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); + /** */ + private final ThreadLocal<ClientCacheChangeDiscoveryMessage> clientCacheChanges = new ThreadLocal<>(); + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -126,14 +141,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. - registeredGrps.clear(); + caches.clear(); affCalcVer = null; lastAffVer = null; - for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors().values()) - registeredGrps.put(desc.groupId(), desc); + caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors()); } if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { @@ -319,20 +333,312 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param exchActions Cache change requests to execute on exchange. + * @param reqId Request ID. + * @param startReqs Client cache start request. + * @return Descriptors for caches to start. */ - private void updateCachesInfo(ExchangeActions exchActions) { - for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) { - CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId()); + @Nullable private List<DynamicCacheDescriptor> clientCachesToStart(UUID reqId, + Map<String, DynamicCacheChangeRequest> startReqs) { + List<DynamicCacheDescriptor> startDescs = new ArrayList<>(startReqs.size()); + + for (DynamicCacheChangeRequest startReq : startReqs.values()) { + DynamicCacheDescriptor desc = caches.cache(CU.cacheId(startReq.cacheName())); + + if (desc == null) { + CacheException err = new CacheException("Failed to start client cache " + + "(a cache with the given name is not started): " + startReq.cacheName()); + + cctx.cache().completeClientCacheChangeFuture(reqId, err); - assert rmvd != null : stopDesc.cacheOrGroupName(); + return null; + } + + if (cctx.cacheContext(desc.cacheId()) != null) + continue; + + startDescs.add(desc); } - for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) { - CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc); + return startDescs; + } - assert old == null : old; + /** + * @param msg Change request. + * @param crd Coordinator flag. + * @param topVer Current topology version. + * @param discoCache Discovery data cache. + * @return Map of started caches (cache ID to near enabled flag). + */ + @Nullable private Map<Integer, Boolean> processClientCacheStartRequests( + ClientCacheChangeDummyDiscoveryMessage msg, + boolean crd, + AffinityTopologyVersion topVer, + DiscoCache discoCache) { + Map<String, DynamicCacheChangeRequest> startReqs = msg.startRequests(); + + if (startReqs == null) + return null; + + List<DynamicCacheDescriptor> startDescs = clientCachesToStart(msg.requestId(), msg.startRequests()); + + if (startDescs == null || startDescs.isEmpty()) { + cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null); + + return null; + } + + Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size()); + + Set<String> startedCaches = U.newHashSet(startDescs.size()); + + Map<Integer, Boolean> startedInfos = U.newHashMap(startDescs.size()); + + for (DynamicCacheDescriptor desc : startDescs) { + try { + startedCaches.add(desc.cacheName()); + + DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName()); + + cctx.cache().prepareCacheStart(desc, startReq.nearCacheConfiguration(), topVer); + + startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null); + + CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); + + assert grp != null : desc.groupId(); + assert !grp.affinityNode() || grp.isLocal() : grp.cacheOrGroupName(); + + if (!grp.isLocal() && grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { + assert grp.localStartVersion().equals(topVer) : grp.localStartVersion(); + + if (crd) { + CacheGroupHolder grpHolder = grpHolders.get(grp.groupId()); + + assert grpHolder != null && grpHolder.affinity().idealAssignment() != null; + + if (grpHolder.client()) { + ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer); + + grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); + + GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId()); + + if (clientTop != null) { + grp.topology().update(topVer, + clientTop.partitionMap(true), + clientTop.updateCounters(false)); + } + + grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); + + grpHolders.put(grp.groupId(), grpHolder); + + assert grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion()); + } + } + else if (!fetchFuts.containsKey(grp.groupId())) { + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, + grp.groupId(), + topVer, + discoCache); + + fetchFut.init(true); + + fetchFuts.put(grp.groupId(), fetchFut); + } + } + } + catch (IgniteCheckedException e) { + cctx.cache().closeCaches(startedCaches, false); + + cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e); + + return null; + } } + + for (GridDhtAssignmentFetchFuture fetchFut : fetchFuts.values()) { + try { + CacheGroupContext grp = cctx.cache().cacheGroup(fetchFut.groupId()); + + assert grp != null; + + GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer, + null, + discoCache, + grp.affinity(), + fetchFut); + + GridDhtPartitionFullMap partMap; + ClientCacheDhtTopologyFuture topFut; + + if (res != null) { + partMap = res.partitionMap(); + + assert partMap != null : res; + + topFut = new ClientCacheDhtTopologyFuture(topVer); + } + else { + partMap = new GridDhtPartitionFullMap(cctx.localNodeId(), cctx.localNode().order(), 1); + + topFut = new ClientCacheDhtTopologyFuture(topVer, + new ClusterTopologyServerNotFoundException("All server nodes left grid.")); + } + + grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); + + grp.topology().update(topVer, partMap, null); + + topFut.validate(grp, discoCache.allNodes()); + } + catch (IgniteCheckedException e) { + cctx.cache().closeCaches(startedCaches, false); + + cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e); + + return null; + } + } + + cctx.cache().initCacheProxies(topVer, null); + + cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null); + + return startedInfos; + } + + /** + * @param msg Change request. + * @param topVer Current topology version. + * @param crd Coordinator flag. + * @return Closed caches IDs. + */ + private Set<Integer> processCacheCloseRequests( + ClientCacheChangeDummyDiscoveryMessage msg, + boolean crd, + AffinityTopologyVersion topVer) { + Set<String> cachesToClose = msg.cachesToClose(); + + if (cachesToClose == null) + return null; + + Set<Integer> closed = cctx.cache().closeCaches(cachesToClose, true); + + if (crd) { + for (CacheGroupHolder hld : grpHolders.values()) { + if (!hld.client() && cctx.cache().cacheGroup(hld.groupId()) == null) { + int grpId = hld.groupId(); + + // All client cache groups were stopped, need create 'client' CacheGroupHolder. + CacheGroupHolder grpHolder = grpHolders.remove(grpId); + + assert grpHolder != null && !grpHolder.client() : grpHolder; + + try { + grpHolder = CacheGroupHolder2.create(cctx, + caches.group(grpId), + topVer, + grpHolder.affinity()); + + grpHolders.put(grpId, grpHolder); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize cache: " + e, e); + } + } + } + } + + cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null); + + return closed; + } + + /** + * Process client cache start/close requests, called from exchange thread. + * + * @param msg Change request. + */ + void processClientCachesChanges(ClientCacheChangeDummyDiscoveryMessage msg) { + AffinityTopologyVersion topVer = cctx.exchange().readyAffinityVersion(); + + DiscoCache discoCache = cctx.discovery().discoCache(topVer); + + boolean crd = cctx.localNode().equals(discoCache.oldestAliveServerNode()); + + Map<Integer, Boolean> startedCaches = processClientCacheStartRequests(msg, crd, topVer, discoCache); + + Set<Integer> closedCaches = processCacheCloseRequests(msg, crd, topVer); + + if (startedCaches != null || closedCaches != null) + scheduleClientChangeMessage(startedCaches, closedCaches); + } + + /** + * Sends discovery message about started/closed client caches, called from exchange thread. + * + * @param timeoutObj Timeout object initiated send. + */ + void sendClientCacheChangesMessage(ClientCacheUpdateTimeout timeoutObj) { + ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get(); + + // Timeout object was changed if one more client cache changed during timeout, + // another timeoutObj was scheduled. + if (msg != null && msg.updateTimeoutObject() == timeoutObj) { + assert !msg.empty() : msg; + + clientCacheChanges.remove(); + + msg.checkCachesExist(caches.registeredCaches.keySet()); + + try { + if (!msg.empty()) + cctx.discovery().sendCustomEvent(msg); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send discovery event: " + e, e); + } + } + } + + /** + * @param startedCaches Started caches. + * @param closedCaches Closed caches. + */ + private void scheduleClientChangeMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) { + ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get(); + + if (msg == null) { + msg = new ClientCacheChangeDiscoveryMessage(startedCaches, closedCaches); + + clientCacheChanges.set(msg); + } + else { + msg.merge(startedCaches, closedCaches); + + if (msg.empty()) { + cctx.time().removeTimeoutObject(msg.updateTimeoutObject()); + + clientCacheChanges.remove(); + + return; + } + } + + if (msg.updateTimeoutObject() != null) + cctx.time().removeTimeoutObject(msg.updateTimeoutObject()); + + long timeout = clientCacheMsgTimeout; + + if (timeout <= 0) + timeout = 10_000; + + ClientCacheUpdateTimeout timeoutObj = new ClientCacheUpdateTimeout(cctx, timeout); + + msg.updateTimeoutObject(timeoutObj); + + cctx.time().addTimeoutObject(timeoutObj); } /** @@ -342,16 +648,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param crd Coordinator flag. * @param exchActions Cache change requests. * @throws IgniteCheckedException If failed. - * @return {@code True} if client-only exchange is needed. */ - public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut, + public void onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut, boolean crd, final ExchangeActions exchActions) throws IgniteCheckedException { assert exchActions != null && !exchActions.empty() : exchActions; - updateCachesInfo(exchActions); + caches.updateCachesInfo(exchActions); // Affinity did not change for existing caches. forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() { @@ -363,7 +668,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } }); - for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) { + for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) { DynamicCacheDescriptor cacheDesc = action.descriptor(); DynamicCacheChangeRequest req = action.request(); @@ -396,81 +701,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " + "[cacheName=" + req.cacheName() + ']', e); - cctx.cache().forceCloseCache(fut.topologyVersion(), action, e); - } - } - - Set<Integer> gprs = new HashSet<>(); - - for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) { - Integer grpId = action.descriptor().groupId(); + cctx.cache().closeCaches(Collections.singleton(req.cacheName()), false); - if (gprs.add(grpId)) { - if (crd && lateAffAssign) - initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());else { - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - - if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) { - assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); - - initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); - } - } + cctx.cache().completeCacheStartFuture(req, false, e); } } - List<ExchangeActions.ActionData> closeReqs = exchActions.closeRequests(cctx.localNodeId()); - - for (ExchangeActions.ActionData req : closeReqs) { - cctx.cache().blockGateway(req.request()); - - if (crd) { - CacheGroupContext grp = cctx.cache().cacheGroup(req.descriptor().groupId()); - - assert grp != null; - - if (grp.affinityNode()) - continue; - - boolean grpClosed = false; - - if (grp.sharedGroup()) { - boolean cacheRemaining = false; - - for (GridCacheContext ctx : cctx.cacheContexts()) { - if (ctx.group() == grp && !cacheClosed(ctx.cacheId(), closeReqs)) { - cacheRemaining = true; - - break; - } - } - - if (!cacheRemaining) - grpClosed = true; - } - else - grpClosed = true; + Set<Integer> gprs = new HashSet<>(); - // All client cache groups were stopped, need create 'client' CacheGroupHolder. - if (grpClosed) { - CacheGroupHolder grpHolder = grpHolders.remove(grp.groupId()); + for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) { + Integer grpId = action.descriptor().groupId(); - if (grpHolder != null) { - assert !grpHolder.client() : grpHolder; + if (gprs.add(grpId)) { + if (crd && lateAffAssign) + initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor()); + else { + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - grpHolder = CacheGroupHolder2.create(cctx, - registeredGrps.get(grp.groupId()), - fut, - grp.affinity()); + if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) { + assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); - grpHolders.put(grp.groupId(), grpHolder); + initAffinity(caches.group(grp.groupId()), grp.affinity(), fut); } } } } for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) - cctx.cache().blockGateway(action.request()); + cctx.cache().blockGateway(action.request().cacheName(), true); Set<Integer> stoppedGrps = null; @@ -519,21 +777,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } - return exchActions.clientOnlyExchange(); - } + ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get(); - /** - * @param cacheId Cache ID. - * @param closeReqs Close requests. - * @return {@code True} if requests contain request for given cache ID. - */ - private boolean cacheClosed(int cacheId, List<ExchangeActions.ActionData> closeReqs) { - for (ExchangeActions.ActionData req : closeReqs) { - if (req.descriptor().cacheId() == cacheId) - return true; - } + if (msg != null) { + msg.checkCachesExist(caches.registeredCaches.keySet()); - return false; + if (msg.empty()) + clientCacheChanges.remove(); + } } /** @@ -542,7 +793,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap public void removeAllCacheInfo(){ grpHolders.clear(); - registeredGrps.clear(); + caches.clear(); } /** @@ -633,7 +884,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert affTopVer.topologyVersion() > 0 : affTopVer; - CacheGroupDescriptor desc = registeredGrps.get(aff.groupId()); + CacheGroupDescriptor desc = caches.group(aff.groupId()); assert desc != null : aff.cacheOrGroupName(); @@ -763,7 +1014,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException { assert lateAffAssign; - for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) { + for (CacheGroupDescriptor cacheDesc : caches.allGroups()) { if (cacheDesc.config().getCacheMode() == LOCAL) continue; @@ -811,7 +1062,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (grpHolder == null) { grpHolder = grp != null ? new CacheGroupHolder1(grp, null) : - CacheGroupHolder2.create(cctx, grpDesc, fut, null); + CacheGroupHolder2.create(cctx, grpDesc, fut.topologyVersion(), null); CacheGroupHolder old = grpHolders.put(grpId, grpHolder); @@ -843,12 +1094,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap public void initStartedCaches(boolean crd, final GridDhtPartitionsExchangeFuture fut, Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException { - for (DynamicCacheDescriptor desc : descs) { - CacheGroupDescriptor grpDesc = desc.groupDescriptor(); - - if (!registeredGrps.containsKey(grpDesc.groupId())) - registeredGrps.put(grpDesc.groupId(), grpDesc); - } + caches.initStartedCaches(descs); if (crd && lateAffAssign) { forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @@ -868,7 +1114,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(registeredGrps.get(aff.groupId()), aff, fut); + initAffinity(caches.group(aff.groupId()), aff, fut); } }); } @@ -893,13 +1139,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else { GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - desc, + desc.groupId(), fut.topologyVersion(), fut.discoCache()); - fetchFut.init(); + fetchFut.init(false); - fetchAffinity(fut, aff, fetchFut); + fetchAffinity(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache(), + aff, fetchFut); } } @@ -990,7 +1239,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap StringBuilder names = new StringBuilder(); for (Integer grpId : grpIds) { - String name = registeredGrps.get(grpId).cacheOrGroupName(); + String name = caches.group(grpId).cacheOrGroupName(); if (names.length() != 0) names.append(", "); @@ -1022,16 +1271,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grp.affinity().initialize(fut.topologyVersion(), assignment); } else { - CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId()); + CacheGroupDescriptor grpDesc = caches.group(grp.groupId()); assert grpDesc != null : grp.cacheOrGroupName(); GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - grpDesc, + grpDesc.groupId(), topVer, fut.discoCache()); - fetchFut.init(); + fetchFut.init(false); fetchFuts.add(fetchFut); } @@ -1042,48 +1291,57 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Integer grpId = fetchFut.groupId(); - fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut); + fetchAffinity(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache(), + cctx.cache().cacheGroup(grpId).affinity(), + fetchFut); } } /** - * @param fut Exchange future. + * @param topVer Topology version. + * @param discoveryEvt Discovery event. + * @param discoCache Discovery data cache. * @param affCache Affinity. * @param fetchFut Affinity fetch future. * @throws IgniteCheckedException If failed. + * @return Affinity assignment response. */ - private void fetchAffinity(GridDhtPartitionsExchangeFuture fut, + private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer, + @Nullable DiscoveryEvent discoveryEvt, + DiscoCache discoCache, GridAffinityAssignmentCache affCache, GridDhtAssignmentFetchFuture fetchFut) throws IgniteCheckedException { assert affCache != null; - AffinityTopologyVersion topVer = fut.topologyVersion(); - GridDhtAffinityAssignmentResponse res = fetchFut.get(); if (res == null) { - List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> aff = affCache.calculate(topVer, discoveryEvt, discoCache); affCache.initialize(topVer, aff); } else { - List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(cctx.discovery()); + List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache); if (idealAff != null) affCache.idealAssignment(idealAff); else { assert !affCache.centralizedAffinityFunction() || !lateAffAssign; - affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + affCache.calculate(topVer, discoveryEvt, discoCache); } - List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery()); + List<List<ClusterNode>> aff = res.affinityAssignment(discoCache); assert aff != null : res; affCache.initialize(topVer, aff); } + + return res; } /** @@ -1136,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (grp.isLocal()) continue; - initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); + initAffinity(caches.group(grp.groupId()), grp.affinity(), fut); } } @@ -1176,7 +1434,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } ); - grpHolder = CacheGroupHolder2.create(cctx, desc, fut, null); + grpHolder = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null); final GridAffinityAssignmentCache aff = grpHolder.affinity(); @@ -1198,18 +1456,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev; GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - desc, + desc.groupId(), prev.topologyVersion(), prev.discoCache()); - fetchFut.init(); + fetchFut.init(false); final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>(); fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() { @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut) throws IgniteCheckedException { - fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut); + fetchAffinity(prev.topologyVersion(), + prev.discoveryEvent(), + prev.discoCache(), + aff, (GridDhtAssignmentFetchFuture)fetchFut); aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); @@ -1268,7 +1529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } ); - cacheGrp = CacheGroupHolder2.create(cctx, desc, fut, null); + cacheGrp = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null); } else cacheGrp = new CacheGroupHolder1(grp, null); @@ -1749,7 +2010,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param cctx Context. * @param grpDesc Cache group descriptor. - * @param fut Exchange future. + * @param topVer Current exchange version. * @param initAff Current affinity. * @return Cache holder. * @throws IgniteCheckedException If failed. @@ -1757,7 +2018,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap static CacheGroupHolder2 create( GridCacheSharedContext cctx, CacheGroupDescriptor grpDesc, - GridDhtPartitionsExchangeFuture fut, + AffinityTopologyVersion topVer, @Nullable GridAffinityAssignmentCache initAff) throws IgniteCheckedException { assert grpDesc != null; assert !cctx.kernalContext().clientNode(); @@ -1768,7 +2029,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert ccfg.getCacheMode() != LOCAL : ccfg.getName(); assert !cctx.discovery().cacheGroupAffinityNodes(grpDesc.groupId(), - fut.topologyVersion()).contains(cctx.localNode()) : grpDesc.cacheOrGroupName(); + topVer).contains(cctx.localNode()) : grpDesc.cacheOrGroupName(); AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity()); @@ -1872,7 +2133,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cacheWaitParts == null) { waitGrps.put(grpId, cacheWaitParts = new HashMap<>()); - deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId()); + deploymentIds.put(grpId, caches.group(grpId).deploymentId()); } cacheWaitParts.put(part, waitNode); @@ -1891,4 +2152,101 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']'; } } + + /** + * + */ + static class CachesInfo { + /** Registered cache groups (updated from exchange thread). */ + private final ConcurrentHashMap<Integer, CacheGroupDescriptor> registeredGrps = new ConcurrentHashMap<>(); + + /** Registered caches (updated from exchange thread). */ + private final ConcurrentHashMap<Integer, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); + + /** + * @param grps Registered groups. + * @param caches Registered caches. + */ + void init(Map<Integer, CacheGroupDescriptor> grps, Map<String, DynamicCacheDescriptor> caches) { + for (CacheGroupDescriptor grpDesc : grps.values()) + registeredGrps.put(grpDesc.groupId(), grpDesc); + + for (DynamicCacheDescriptor cacheDesc : caches.values()) + registeredCaches.put(cacheDesc.cacheId(), cacheDesc); + } + + /** + * @return All registered groups. + */ + Collection<CacheGroupDescriptor> allGroups() { + return registeredGrps.values(); + } + + /** + * @param grpId Group ID. + * @return Group descriptor. + */ + CacheGroupDescriptor group(int grpId) { + CacheGroupDescriptor desc = registeredGrps.get(grpId); + + assert desc != null : grpId; + + return desc; + } + + /** + * @param descs Cache descriptor. + */ + void initStartedCaches(Collection<DynamicCacheDescriptor> descs) { + for (DynamicCacheDescriptor desc : descs) { + CacheGroupDescriptor grpDesc = desc.groupDescriptor(); + + if (!registeredGrps.containsKey(grpDesc.groupId())) + registeredGrps.put(grpDesc.groupId(), grpDesc); + + if (!registeredCaches.containsKey(desc.cacheName())) + registeredCaches.put(desc.cacheId(), desc); + } + } + + /** + * @param exchActions Exchange actions. + */ + void updateCachesInfo(ExchangeActions exchActions) { + for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) { + CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId()); + + assert rmvd != null : stopDesc.cacheOrGroupName(); + } + + for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) { + CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc); + + assert old == null : old; + } + + for (ExchangeActions.ActionData req : exchActions.cacheStopRequests()) + registeredCaches.remove(req.descriptor().cacheId()); + + for (ExchangeActions.ActionData req : exchActions.cacheStartRequests()) + registeredCaches.put(req.descriptor().cacheId(), req.descriptor()); + } + + /** + * @param cacheId Cache ID. + * @return Cache descriptor if cache found. + */ + @Nullable DynamicCacheDescriptor cache(Integer cacheId) { + return registeredCaches.get(cacheId); + } + + /** + * + */ + void clear() { + registeredGrps.clear(); + + registeredCaches.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 4844a55..196df57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -898,6 +898,9 @@ public class CacheGroupContext { res.idealAffinityAssignment(assignment.idealAssignment()); } + if (req.sendPartitionsState()) + res.partitionMap(top.partitionMap(true)); + try { ctx.io().send(nodeId, res, AFFINITY_POOL); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java new file mode 100644 index 0000000..3d120f7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Sent from cache client node to asynchronously notify about started.closed client caches. + */ +public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid id = IgniteUuid.randomUuid();; + + /** */ + @GridToStringInclude + private Map<Integer, Boolean> startedCaches; + + /** */ + @GridToStringInclude + private Set<Integer> closedCaches; + + /** Update timeout object, used to batch multiple starts/close into single discovery message. */ + private transient ClientCacheUpdateTimeout updateTimeoutObj; + + /** + * @param startedCaches Started caches. + * @param closedCaches Closed caches. + */ + public ClientCacheChangeDiscoveryMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) { + this.startedCaches = startedCaches; + this.closedCaches = closedCaches; + } + + /** + * @param startedCaches Started caches. + * @param closedCaches Closed caches. + */ + public void merge(@Nullable Map<Integer, Boolean> startedCaches, @Nullable Set<Integer> closedCaches) { + Map<Integer, Boolean> startedCaches0 = this.startedCaches; + Set<Integer> closedCaches0 = this.closedCaches; + + if (startedCaches != null) { + if (startedCaches0 == null) + this.startedCaches = startedCaches0 = new HashMap<>(); + + for (Map.Entry<Integer, Boolean> e : startedCaches.entrySet()) { + if (closedCaches0 != null && closedCaches0.remove(e.getKey())) + continue; + + Boolean old = startedCaches0.put(e.getKey(), e.getValue()); + + assert old == null : e.getKey(); + } + } + + if (closedCaches != null) { + if (closedCaches0 == null) + this.closedCaches = closedCaches0 = new HashSet<>(); + + for (Integer cacheId : closedCaches) { + if (startedCaches0 != null && startedCaches0.remove(cacheId) != null) + continue; + + boolean add = closedCaches0.add(cacheId); + + assert add : cacheId; + } + } + } + + /** + * @return {@code True} if there are no info about started/closed caches. + */ + public boolean empty() { + return F.isEmpty(startedCaches) && F.isEmpty(closedCaches); + } + + /** + * @param caches Started caches' IDs. + */ + void checkCachesExist(Set<Integer> caches) { + if (closedCaches != null) { + for (Iterator<Integer> it = closedCaches.iterator(); it.hasNext();) { + Integer cacheId = it.next(); + + if (!caches.contains(cacheId)) + it.remove(); + } + } + + if (startedCaches != null) { + for (Iterator<Integer> it = startedCaches.keySet().iterator(); it.hasNext();) { + Integer cacheId = it.next(); + + if (!caches.contains(cacheId)) + it.remove(); + } + } + } + + /** + * @return Update timeout object. + */ + public ClientCacheUpdateTimeout updateTimeoutObject() { + return updateTimeoutObj; + } + + /** + * @param updateTimeoutObj Update timeout object. + */ + public void updateTimeoutObject(ClientCacheUpdateTimeout updateTimeoutObj) { + this.updateTimeoutObj = updateTimeoutObj; + } + + /** + * @return Started caches map (cache ID to near enabled flag). + */ + @Nullable public Map<Integer, Boolean> startedCaches() { + return startedCaches; + } + + /** + * @return Closed caches. + */ + @Nullable public Set<Integer> closedCaches() { + return closedCaches; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientCacheChangeDiscoveryMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java new file mode 100644 index 0000000..68bca27 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Dummy discovery message which is not really sent via ring, it is just added in local discovery worker queue. + */ +public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMessage, + CachePartitionExchangeWorkerTask { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final UUID reqId; + + /** */ + private final Map<String, DynamicCacheChangeRequest> startReqs; + + /** */ + @GridToStringInclude + private final Set<String> cachesToClose; + + /** + * @param reqId Start request ID. + * @param startReqs Caches start requests. + * @param cachesToClose Cache to close. + */ + public ClientCacheChangeDummyDiscoveryMessage(UUID reqId, + @Nullable Map<String, DynamicCacheChangeRequest> startReqs, + @Nullable Set<String> cachesToClose) { + assert reqId != null; + assert startReqs != null ^ cachesToClose != null; + + this.reqId = reqId; + this.startReqs = startReqs; + this.cachesToClose = cachesToClose; + } + + /** + * @return Start request ID. + */ + UUID requestId() { + return reqId; + } + + /** + * @return Cache start requests. + */ + @Nullable Map<String, DynamicCacheChangeRequest> startRequests() { + return startReqs; + } + + /** + * @return Client caches to close. + */ + Set<String> cachesToClose() { + return cachesToClose; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientCacheChangeDummyDiscoveryMessage.class, this, + "startCaches", (startReqs != null ? startReqs.keySet() : "")); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java new file mode 100644 index 0000000..aab3a3e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; + +/** + * + */ +class ClientCacheUpdateTimeout extends GridTimeoutObjectAdapter implements CachePartitionExchangeWorkerTask { + /** */ + private final GridCacheSharedContext cctx; + + /** + * @param cctx Context. + * @param timeout Timeout. + */ + ClientCacheUpdateTimeout(GridCacheSharedContext cctx, long timeout) { + super(timeout); + + this.cctx = cctx; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (!cctx.kernalContext().isStopping()) + cctx.exchange().addCustomTask(this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index d5718f8..7398074 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -280,6 +280,40 @@ class ClusterCachesInfo { } /** + * @param msg Message. + * @param node Node sent message. + */ + void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) { + Map<Integer, Boolean> startedCaches = msg.startedCaches(); + + if (startedCaches != null) { + for (Map.Entry<Integer, Boolean> e : startedCaches.entrySet()) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (e.getKey().equals(desc.cacheId())) { + ctx.discovery().addClientNode(desc.cacheName(), node.id(), e.getValue()); + + break; + } + } + } + } + + Set<Integer> closedCaches = msg.closedCaches(); + + if (closedCaches != null) { + for (Integer cacheId : closedCaches) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (cacheId.equals(desc.cacheId())) { + ctx.discovery().onClientCacheClose(desc.cacheName(), node.id()); + + break; + } + } + } + } + } + + /** * @param batch Cache change request. * @param topVer Topology version. * @return {@code True} if minor topology version should be increased. @@ -325,10 +359,14 @@ class ClusterCachesInfo { continue; } + assert !req.clientStartOnly() : req; + DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName()); boolean needExchange = false; + boolean clientCacheStart = false; + AffinityTopologyVersion waitTopVer = null; if (req.start()) { @@ -388,6 +426,7 @@ class ClusterCachesInfo { assert old == null; ctx.discovery().setCacheFilter( + startDesc.cacheId(), grpDesc.groupId(), ccfg.getName(), ccfg.getNearConfiguration() != null); @@ -406,40 +445,35 @@ class ClusterCachesInfo { else { assert req.initiatingNodeId() != null : req; - // Cache already exists, exchange is needed only if client cache should be created. - ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); - - boolean clientReq = node != null && - !ctx.discovery().cacheAffinityNode(node, req.cacheName()); - - if (req.clientStartOnly()) { - needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); + if (req.failIfExists()) { + ctx.cache().completeCacheStartFuture(req, false, + new CacheExistsException("Failed to start cache " + + "(a cache with the same name is already started): " + req.cacheName())); } else { - if (req.failIfExists()) { - ctx.cache().completeCacheStartFuture(req, false, - new CacheExistsException("Failed to start cache " + - "(a cache with the same name is already started): " + req.cacheName())); - } - else { - needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), + // Cache already exists, it is possible client cache is needed. + ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); + + boolean clientReq = node != null && + !ctx.discovery().cacheAffinityNode(node, req.cacheName()); + + if (clientReq) { + ctx.discovery().addClientNode(req.cacheName(), req.initiatingNodeId(), req.nearCacheConfiguration() != null); - } - } - if (needExchange) { - req.clientStartOnly(true); + if (node.id().equals(req.initiatingNodeId())) { + desc.clientCacheStartVersion(topVer); - desc.clientCacheStartVersion(topVer.nextMinorVersion()); + clientCacheStart = true; - exchangeActions.addClientCacheToStart(req, desc); + ctx.discovery().clientCacheStartEvent(req.requestId(), F.asMap(req.cacheName(), req), null); + } + } } } - if (!needExchange && desc != null) { + if (!needExchange && !clientCacheStart && desc != null) { if (desc.clientCacheStartVersion() != null) waitTopVer = desc.clientCacheStartVersion(); else { @@ -508,19 +542,11 @@ class ClusterCachesInfo { } } } - else if (req.close()) { - if (desc != null) { - needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); - - if (needExchange) - exchangeActions.addCacheToClose(req, desc); - } - } else assert false : req; if (!needExchange) { - if (req.initiatingNodeId().equals(ctx.localNodeId())) + if (!clientCacheStart && req.initiatingNodeId().equals(ctx.localNodeId())) reqsToComplete.add(new T2<>(req, waitTopVer)); } else @@ -849,6 +875,7 @@ class ClusterCachesInfo { registeredCaches.put(cacheData.cacheConfiguration().getName(), desc); ctx.discovery().setCacheFilter( + desc.cacheId(), grpDesc.groupId(), cfg.getName(), cfg.getNearConfiguration() != null); @@ -1082,6 +1109,7 @@ class ClusterCachesInfo { joinData.cacheDeploymentId()); ctx.discovery().setCacheFilter( + cacheId, grpDesc.groupId(), cfg.getName(), cfg.getNearConfiguration() != null); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 5434061..c6da43f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -67,9 +67,6 @@ public class DynamicCacheChangeRequest implements Serializable { /** Destroy. */ private boolean destroy; - /** Close flag. */ - private boolean close; - /** Whether cache was created through SQL. */ private boolean sql; @@ -155,19 +152,6 @@ public class DynamicCacheChangeRequest implements Serializable { /** * @param ctx Context. * @param cacheName Cache name. - * @return Request to close client cache. - */ - static DynamicCacheChangeRequest closeRequest(GridKernalContext ctx, String cacheName) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); - - req.close(true); - - return req; - } - - /** - * @param ctx Context. - * @param cacheName Cache name. * @param sql {@code true} if the cache must be stopped only if it was created by SQL command {@code CREATE TABLE}. * @param destroy Destroy flag. * @return Cache stop request. @@ -372,20 +356,6 @@ public class DynamicCacheChangeRequest implements Serializable { } /** - * @return Close flag. - */ - public boolean close() { - return close; - } - - /** - * @param close New close flag. - */ - public void close(boolean close) { - this.close = close; - } - - /** * @return SQL flag. */ public boolean sql() { http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index d577b30..31546be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -43,15 +43,9 @@ public class ExchangeActions { private Map<String, ActionData> cachesToStart; /** */ - private Map<String, ActionData> clientCachesToStart; - - /** */ private Map<String, ActionData> cachesToStop; /** */ - private Map<String, ActionData> cachesToClose; - - /** */ private Map<String, ActionData> cachesToResetLostParts; /** */ @@ -60,7 +54,7 @@ public class ExchangeActions { /** * @return {@code True} if server nodes should not participate in exchange. */ - boolean clientOnlyExchange() { + public boolean clientOnlyExchange() { return F.isEmpty(cachesToStart) && F.isEmpty(cachesToStop) && F.isEmpty(cacheGrpsToStart) && @@ -69,27 +63,6 @@ public class ExchangeActions { } /** - * @param nodeId Local node ID. - * @return Close cache requests. - */ - List<ActionData> closeRequests(UUID nodeId) { - List<ActionData> res = null; - - if (cachesToClose != null) { - for (ActionData req : cachesToClose.values()) { - if (nodeId.equals(req.req.initiatingNodeId())) { - if (res == null) - res = new ArrayList<>(cachesToClose.size()); - - res.add(req); - } - } - } - - return res != null ? res : Collections.<ActionData>emptyList(); - } - - /** * @return New caches start requests. */ Collection<ActionData> cacheStartRequests() { @@ -97,25 +70,6 @@ public class ExchangeActions { } /** - * @return Start cache requests. - */ - Collection<ActionData> newAndClientCachesStartRequests() { - if (cachesToStart != null || clientCachesToStart != null) { - List<ActionData> res = new ArrayList<>(); - - if (cachesToStart != null) - res.addAll(cachesToStart.values()); - - if (clientCachesToStart != null) - res.addAll(clientCachesToStart.values()); - - return res; - } - - return Collections.emptyList(); - } - - /** * @return Stop cache requests. */ Collection<ActionData> cacheStopRequests() { @@ -128,8 +82,6 @@ public class ExchangeActions { public void completeRequestFutures(GridCacheSharedContext ctx) { completeRequestFutures(cachesToStart, ctx); completeRequestFutures(cachesToStop, ctx); - completeRequestFutures(cachesToClose, ctx); - completeRequestFutures(clientCachesToStart, ctx); completeRequestFutures(cachesToResetLostParts, ctx); } @@ -194,21 +146,6 @@ public class ExchangeActions { } /** - * @param nodeId Local node ID. - * @return {@code True} if client cache was started. - */ - public boolean clientCacheStarted(UUID nodeId) { - if (clientCachesToStart != null) { - for (ActionData cache : clientCachesToStart.values()) { - if (nodeId.equals(cache.req.initiatingNodeId())) - return true; - } - } - - return false; - } - - /** * @param state New cluster state. */ void newClusterState(ClusterState state) { @@ -260,16 +197,6 @@ public class ExchangeActions { * @param req Request. * @param desc Cache descriptor. */ - void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) { - assert req.start() : req; - - clientCachesToStart = add(clientCachesToStart, req, desc); - } - - /** - * @param req Request. - * @param desc Cache descriptor. - */ void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) { assert req.stop() : req; @@ -280,16 +207,6 @@ public class ExchangeActions { * @param req Request. * @param desc Cache descriptor. */ - void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) { - assert req.close() : req; - - cachesToClose = add(cachesToClose, req, desc); - } - - /** - * @param req Request. - * @param desc Cache descriptor. - */ void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) { assert req.resetLostPartitions() : req; @@ -369,9 +286,7 @@ public class ExchangeActions { */ public boolean empty() { return F.isEmpty(cachesToStart) && - F.isEmpty(clientCachesToStart) && F.isEmpty(cachesToStop) && - F.isEmpty(cachesToClose) && F.isEmpty(cacheGrpsToStart) && F.isEmpty(cacheGrpsToStop) && F.isEmpty(cachesToResetLostParts); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 839ddbd..d753f99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -829,14 +829,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Partition topology. */ public GridDhtPartitionTopology topology() { - GridCacheAdapter<K, V> cache = this.cache; - - if (cache == null) - throw new IllegalStateException("Cache stopped: " + cacheName); - - assert cache.isNear() || cache.isDht() || cache.isColocated() || cache.isDhtAtomic() : cache; - - return topology(cache); + return grp.topology(); } /**
