Repository: ignite Updated Branches: refs/heads/ignite-1.3.3-p7 c1b537fa5 -> 81e0c401f
ignite-993 Update registeredCaches map only from discovery thread Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33216453 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33216453 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33216453 Branch: refs/heads/ignite-1.3.3-p7 Commit: 3321645392e25a2d0ed5e469de57d6d1a2dc173d Parents: c1b537f Author: sboikov <sboi...@gridgain.com> Authored: Tue Sep 1 15:02:33 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Sep 1 15:21:54 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 50 +++-- .../cache/DynamicCacheChangeRequest.java | 17 ++ .../cache/DynamicCacheDescriptor.java | 14 -- .../GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 187 +++++++++---------- .../cache/IgniteDynamicCacheStartSelfTest.java | 2 - 6 files changed, 137 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index fac6f6d..ce5aca9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -245,7 +245,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param cacheName Cache name. */ public void removeCacheFilter(String cacheName) { - registeredCaches.remove(cacheName); + CachePredicate p = registeredCaches.remove(cacheName); + + assert p != null : cacheName; } /** @@ -254,12 +256,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param cacheName Cache name. * @param clientNodeId Near node ID. * @param nearEnabled Near enabled flag. + * @return {@code True} if new node ID was added. */ - public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) { - CachePredicate pred = registeredCaches.get(cacheName); + public boolean addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) { + CachePredicate p = registeredCaches.get(cacheName); - if (pred != null) - pred.addClientNode(clientNodeId, nearEnabled); + assert p != null : cacheName; + + return p.addClientNode(clientNodeId, nearEnabled); } /** @@ -267,12 +271,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param clientNodeId Near node ID. + * @return {@code True} if existing node ID was removed. */ - public void onClientCacheClose(String cacheName, UUID clientNodeId) { - CachePredicate predicate = registeredCaches.get(cacheName); + public boolean onClientCacheClose(String cacheName, UUID clientNodeId) { + CachePredicate p = registeredCaches.get(cacheName); + + assert p != null : cacheName; - if (predicate != null) - predicate.onNodeLeft(clientNodeId); + return p.onNodeLeft(clientNodeId); } /** @@ -2621,16 +2627,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { */ private static class CachePredicate { /** Cache filter. */ - private IgnitePredicate<ClusterNode> cacheFilter; + private final IgnitePredicate<ClusterNode> cacheFilter; /** If near cache is enabled on data nodes. */ - private boolean nearEnabled; + private final boolean nearEnabled; /** Flag indicating if cache is local. */ - private boolean loc; + private final boolean loc; /** Collection of client near nodes. */ - private ConcurrentHashMap<UUID, Boolean> clientNodes; + private final ConcurrentHashMap<UUID, Boolean> clientNodes; /** * @param cacheFilter Cache filter. @@ -2650,16 +2656,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param nodeId Near node ID to add. * @param nearEnabled Near enabled flag. + * @return {@code True} if new node ID was added. */ - public void addClientNode(UUID nodeId, boolean nearEnabled) { - clientNodes.putIfAbsent(nodeId, nearEnabled); + public boolean addClientNode(UUID nodeId, boolean nearEnabled) { + assert nodeId != null; + + Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled); + + return old == null; } /** * @param leftNodeId Left node ID. + * @return {@code True} if existing node ID was removed. */ - public void onNodeLeft(UUID leftNodeId) { - clientNodes.remove(leftNodeId); + public boolean onNodeLeft(UUID leftNodeId) { + assert leftNodeId != null; + + Boolean old = clientNodes.remove(leftNodeId); + + return old != null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 7af1572..2029a95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -66,6 +66,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** Template configuration flag. */ private boolean template; + /** */ + private transient boolean exchangeNeeded; + /** * Constructor creates cache stop request. * @@ -78,6 +81,20 @@ public class DynamicCacheChangeRequest implements Serializable { } /** + * @return {@code True} if request should trigger partition exchange. + */ + public boolean exchangeNeeded() { + return exchangeNeeded; + } + + /** + * @param exchangeNeeded {@code True} if request should trigger partition exchange. + */ + public void exchangeNeeded(boolean exchangeNeeded) { + this.exchangeNeeded = exchangeNeeded; + } + + /** * @param template {@code True} if this is request for adding template configuration. */ public void template(boolean template) { http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 9c6cc43..f68e920 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -168,20 +168,6 @@ public class DynamicCacheDescriptor { } /** - * Sets cancelled flag. - */ - public void onCancelled() { - cancelled = true; - } - - /** - * @return Cancelled flag. - */ - public boolean cancelled() { - return cancelled; - } - - /** * @param nodeId Remote node ID. * @return Configuration. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4398b4c..5b6f750 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -156,7 +156,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Validate requests to check if event should trigger partition exchange. for (DynamicCacheChangeRequest req : batch.requests()) { - if (cctx.cache().exchangeNeeded(req)) + if (req.exchangeNeeded()) valid.add(req); else cctx.cache().completeStartFuture(req); http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index f13af23..c1d0d17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1405,36 +1405,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param req Request to check. - * @return {@code True} if change request was registered to apply. - */ - @SuppressWarnings("IfMayBeConditional") - public boolean exchangeNeeded(DynamicCacheChangeRequest req) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); - - if (desc != null) { - if (req.close()) { - assert req.initiatingNodeId() != null : req; - - return true; - } - - if (desc.deploymentId().equals(req.deploymentId())) { - if (req.start()) - return !desc.cancelled(); - else - return desc.cancelled(); - } - - // If client requested cache start - if (req.initiatingNodeId() != null) - return true; - } - - return false; - } - - /** * @param reqs Requests to start. * @param topVer Topology version. * @throws IgniteCheckedException If failed to start cache. @@ -1622,11 +1592,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { stopGateway(req); prepareCacheStop(req); - - DynamicCacheDescriptor desc = registeredCaches.get(masked); - - if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) - registeredCaches.remove(masked, desc); } else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) { IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked); @@ -1709,17 +1674,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); for (DynamicCacheDescriptor desc : registeredCaches.values()) { - if (!desc.cancelled()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); - req.startCacheConfiguration(desc.cacheConfiguration()); + req.startCacheConfiguration(desc.cacheConfiguration()); - req.cacheType(desc.cacheType()); + req.cacheType(desc.cacheType()); - req.deploymentId(desc.deploymentId()); + req.deploymentId(desc.deploymentId()); - reqs.add(req); - } + reqs.add(req); } for (DynamicCacheDescriptor desc : registeredTemplates.values()) { @@ -1980,10 +1943,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(e); } - if (desc != null && !desc.cancelled()) { - if (failIfExists) + if (desc != null) { + if (failIfExists) { return new GridFinishedFuture<>(new CacheExistsException("Failed to start cache " + "(a cache with the same name is already started): " + cacheName)); + } else { CacheConfiguration descCfg = desc.cacheConfiguration(); @@ -2029,7 +1993,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { else { req.clientStartOnly(true); - if (desc != null && !desc.cancelled()) + if (desc != null) ccfg = desc.cacheConfiguration(); if (ccfg == null) { @@ -2212,83 +2176,104 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); - if (req.start()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); + boolean needExchange = false; - DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get( - maskNull(ccfg.getName())); + DynamicCacheStartFuture fut = null; - // Check if cache with the same name was concurrently started form different node. - if (desc != null) { - if (!req.clientStartOnly() && req.failIfExists()) { - // If local node initiated start, fail the start future. - if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) { - startFut.onDone(new CacheExistsException("Failed to start cache " + - "(a cache with the same name is already started): " + U.maskName(ccfg.getName()))); - } + if (ctx.localNodeId().equals(req.initiatingNodeId())) { + fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); - return; + if (!req.deploymentId().equals(fut.deploymentId())) + fut = null; + } + + if (req.start()) { + if (desc == null) { + if (req.clientStartOnly()) { + if (fut != null) + fut.onDone(new IgniteCheckedException("Failed to start client cache " + + "(a cache with the given name is not started): " + U.maskName(req.cacheName()))); } + else { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + assert req.cacheType() != null : req; + assert F.eq(ccfg.getName(), req.cacheName()) : req; - req.clientStartOnly(true); + DynamicCacheDescriptor startDesc = + new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId()); + + DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc); + + assert old == null : + "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; + + ctx.discovery().setCacheFilter( + ccfg.getName(), + ccfg.getNodeFilter(), + ccfg.getNearConfiguration() != null, + ccfg.getCacheMode() == LOCAL); + + ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); + + needExchange = true; + } } else { if (req.clientStartOnly()) { - if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) { - startFut.onDone(new IgniteCheckedException("Failed to start client cache " + - "(a cache with the given name is not started): " + U.maskName(ccfg.getName()))); - } + assert req.initiatingNodeId() != null : req; - return; + needExchange = ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); } - } - - if (!req.clientStartOnly() && desc == null) { - assert req.cacheType() != null : req; - - DynamicCacheDescriptor startDesc = - new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId()); + else { + if (req.failIfExists() ) { + if (fut != null) + fut.onDone(new CacheExistsException("Failed to start cache " + + "(a cache with the same name is already started): " + U.maskName(req.cacheName()))); + } + else { + // Cache already exists, exchange is needed only if client cache should be created. + ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); - DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc); + boolean clientReq = node != null && + !ctx.discovery().cacheAffinityNode(node, req.cacheName()); - assert old == null : - "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; + needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); - ctx.discovery().setCacheFilter( - ccfg.getName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode() == LOCAL); + if (needExchange) + req.clientStartOnly(true); + } + } } - - ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); } else { - assert req.stop() || req.close() : req; + assert req.stop() ^ req.close() : req; - if (desc == null) { - // If local node initiated start, finish future. - DynamicCacheStartFuture changeFut = - (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName())); + if (desc != null) { + if (req.stop()) { + DynamicCacheDescriptor old = registeredCaches.remove(maskNull(req.cacheName())); - if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) { - // No-op. - changeFut.onDone(); - } + assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']'; - return; - } + ctx.discovery().removeCacheFilter(req.cacheName()); - if (req.stop()) { - desc.onCancelled(); + needExchange = true; + } + else { + assert req.close() : req; - ctx.discovery().removeCacheFilter(req.cacheName()); + needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); + } } - else - ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId()); } + + req.exchangeNeeded(needExchange); } } @@ -2711,7 +2696,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); - if (desc == null || desc.cancelled()) + if (desc == null) throw new IllegalArgumentException("Cache is not started: " + name); if (!desc.cacheType().userCache()) @@ -2779,7 +2764,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { public CacheConfiguration cacheConfiguration(String name) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name)); - if (desc == null || desc.cancelled()) + if (desc == null) throw new IllegalStateException("Cache doesn't exist: " + name); else return desc.cacheConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index d1f8016..95f7701 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -1118,8 +1118,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStopSameCacheMultinode() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-993"); - final AtomicInteger idx = new AtomicInteger(); IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {