ignite-45 - Fixing tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f68fa57a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f68fa57a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f68fa57a Branch: refs/heads/ignite-45 Commit: f68fa57ac285606f86a6214f8f0f8dbe64d7fba9 Parents: e37457a Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Sun Mar 15 13:17:56 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Sun Mar 15 13:17:56 2015 -0700 ---------------------------------------------------------------------- .../cache/DynamicCacheChangeRequest.java | 80 +++++---- .../processors/cache/GridCacheProcessor.java | 165 ++++++++++--------- .../GridDhtPartitionsExchangeFuture.java | 8 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 89 +++++++++- 4 files changed, 222 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f68fa57a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index d4b4228..be88217 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -43,23 +43,13 @@ public class DynamicCacheChangeRequest implements Serializable { private CacheConfiguration startCfg; /** Near node ID in case if near cache is being started. */ - private UUID clientNodeId; + private UUID initiatingNodeId; /** Near cache configuration. */ private NearCacheConfiguration nearCacheCfg; - /** - * Constructor creates cache start request. - * - * @param startCfg Start cache configuration. - */ - public DynamicCacheChangeRequest( - CacheConfiguration startCfg - ) { - this.startCfg = startCfg; - - deploymentId = IgniteUuid.randomUuid(); - } + /** Start only client cache, do not start data nodes. */ + private boolean clientStartOnly; /** * Constructor creates cache stop request. @@ -73,14 +63,12 @@ public class DynamicCacheChangeRequest implements Serializable { /** * Constructor creates near cache start request. * - * @param clientNodeId Client node ID. - * @param startCfg Start cache configuration. - * @param nearCacheCfg Near cache configuration. + * @param initiatingNodeId Initiating node ID. */ - public DynamicCacheChangeRequest(UUID clientNodeId, CacheConfiguration startCfg, NearCacheConfiguration nearCacheCfg) { - this.clientNodeId = clientNodeId; - this.startCfg = startCfg; - this.nearCacheCfg = nearCacheCfg; + public DynamicCacheChangeRequest( + UUID initiatingNodeId + ) { + this.initiatingNodeId = initiatingNodeId; } /** @@ -101,21 +89,14 @@ public class DynamicCacheChangeRequest implements Serializable { * @return {@code True} if this is a start request. */ public boolean isStart() { - return clientNodeId == null && startCfg != null; - } - - /** - * @return If this is a near cache start request. - */ - public boolean isClientStart() { - return clientNodeId != null; + return startCfg != null; } /** * @return {@code True} if this is a stop request. */ public boolean isStop() { - return clientNodeId == null && startCfg == null; + return initiatingNodeId == null && startCfg == null; } /** @@ -126,26 +107,61 @@ public class DynamicCacheChangeRequest implements Serializable { } /** + * @param cacheName Cache name. + */ + public void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** * @return Near node ID. */ - public UUID clientNodeId() { - return clientNodeId; + public UUID initiatingNodeId() { + return initiatingNodeId; } /** * @return Near cache configuration. */ - public NearCacheConfiguration nearCacheCfg() { + public NearCacheConfiguration nearCacheConfiguration() { return nearCacheCfg; } /** + * @param nearCacheCfg Near cache configuration. + */ + public void nearCacheConfiguration(NearCacheConfiguration nearCacheCfg) { + this.nearCacheCfg = nearCacheCfg; + } + + /** * @return Cache configuration. */ public CacheConfiguration startCacheConfiguration() { return startCfg; } + /** + * @param startCfg Cache configuration. + */ + public void startCacheConfiguration(CacheConfiguration startCfg) { + this.startCfg = startCfg; + } + + /** + * @return Client start only. + */ + public boolean clientStartOnly() { + return clientStartOnly; + } + + /** + * @param clientStartOnly Client start only. + */ + public void clientStartOnly(boolean clientStartOnly) { + this.clientStartOnly = clientStartOnly; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f68fa57a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ad23b14..ea0afd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -311,7 +311,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { "CacheRendezvousAffinityFunction cannot be set [cacheName=" + cc.getName() + ']'); } - if (cc.getNearConfiguration() != null) { + if (cc.getNearConfiguration() != null && + ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) { U.warn(log, "Near cache cannot be used with REPLICATED cache, " + "will be ignored [cacheName=" + cc.getName() + ']'); @@ -1229,7 +1230,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc != null && desc.deploymentId().equals(req.deploymentId())) { - if (req.isStart() || req.isClientStart()) + if (req.isStart()) return !desc.cancelled(); else return desc.cancelled(); @@ -1242,36 +1243,22 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Start request. */ public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException { - assert req.isStart() || req.isClientStart(); + assert req.isStart(); - CacheConfiguration ccfg = req.startCacheConfiguration(); + CacheConfiguration ccfg = new CacheConfiguration(req.startCacheConfiguration()); IgnitePredicate nodeFilter = ccfg.getNodeFilter(); ClusterNode locNode = ctx.discovery().localNode(); if (req.isStart()) { - if (nodeFilter.apply(locNode)) { - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(), ccfg); - - GridCacheContext cacheCtx = createCache(ccfg, cacheObjCtx); - - cacheCtx.dynamicDeploymentId(req.deploymentId()); - - sharedCtx.addCacheContext(cacheCtx); - - startCache(cacheCtx.cache()); - onKernalStart(cacheCtx.cache()); - - caches.put(maskNull(cacheCtx.name()), cacheCtx.cache()); - } - } - else if (req.isClientStart()) { - if (req.clientNodeId().equals(locNode.id())) { - if (nodeFilter.apply(locNode)) { - U.warn(log, "Requested to start client cache on affinity node (will ignore): " + req); + boolean affNodeStart = !req.clientStartOnly() && nodeFilter.apply(locNode); + boolean clientNodeStart = locNode.id().equals(req.initiatingNodeId()); - return; + if (affNodeStart || clientNodeStart) { + if (clientNodeStart && !affNodeStart) { + if (req.nearCacheConfiguration() != null) + ccfg.setNearConfiguration(req.nearCacheConfiguration()); } CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(), ccfg); @@ -1333,7 +1320,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void onExchangeDone(DynamicCacheChangeRequest req) { String masked = maskNull(req.cacheName()); - if (req.isStart() || req.isClientStart()) { + if (req.isStart()) { GridCacheAdapter<?, ?> cache = caches.get(masked); if (cache != null) @@ -1395,7 +1382,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheDescriptor desc : registeredCaches.values()) { if (!desc.cancelled() && desc.valid()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration()); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest((UUID)null); + + req.startCacheConfiguration(desc.cacheConfiguration()); req.deploymentId(desc.deploymentId()); @@ -1415,7 +1404,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); try { - if (req.isStart()) { + if (req.isStart() && !req.clientStartOnly()) { CacheConfiguration ccfg = req.startCacheConfiguration(); if (existing != null) { @@ -1455,9 +1444,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { @Nullable CacheConfiguration ccfg, @Nullable NearCacheConfiguration nearCfg ) { - Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(2); + assert ccfg != null || nearCfg != null; + + String cacheName = ccfg == null ? nearCfg.getName() : ccfg.getName(); + + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); + + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId()); if (ccfg != null) { + if (desc != null) + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + cacheName)); + + req.deploymentId(IgniteUuid.randomUuid()); + try { CacheConfiguration cfg = new CacheConfiguration(ccfg); @@ -1465,40 +1466,33 @@ public class GridCacheProcessor extends GridProcessorAdapter { initialize(cfg, cacheObjCtx); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cfg); - - reqs.add(req); + req.startCacheConfiguration(cfg); } catch (IgniteCheckedException e) { return new GridFinishedFuture(e); } - } - if (nearCfg != null) { - if (ccfg == null) { - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(nearCfg.getName())); + if (nearCfg != null) + nearCfg.setName(ccfg.getName()); + } + else { + req.clientStartOnly(true); - if (desc != null && !desc.cancelled()) - ccfg = desc.cacheConfiguration(); - } + if (desc != null && !desc.cancelled()) + ccfg = desc.cacheConfiguration(); if (ccfg == null) return new GridFinishedFuture<>(new IgniteCheckedException("Failed to start near cache " + "(a cache with the given name is not started): " + nearCfg.getName())); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId(), ccfg, nearCfg); - - reqs.add(req); + req.deploymentId(desc.deploymentId()); + req.startCacheConfiguration(ccfg); } - GridCompoundFuture fut = new GridCompoundFuture(); - - for (DynamicCacheStartFuture startFut : initiateCacheChanges(reqs)) - fut.add(startFut); - - fut.markInitialized(); + if (nearCfg != null) + req.nearCacheConfiguration(nearCfg); - return fut; + return F.first(initiateCacheChanges(F.asList(req))); } /** @@ -1522,13 +1516,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId()); try { - if (req.isStart()) { - if (registeredCaches.containsKey(maskNull(req.cacheName()))) { - fut.onDone(new IgniteCheckedException("Failed to start cache " + - "(a cache with the same name is already started): " + req.cacheName())); - } - } - else if (!req.isClientStart()) { + if (req.isStop()) { DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); if (desc == null) @@ -1552,7 +1540,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { maskNull(req.cacheName()), fut); if (old != null) { - if (req.isStart()) { + if (req.isStart() && !req.clientStartOnly()) { fut.onDone(new IgniteCheckedException("Failed to start cache " + "(a cache with the same name is already being started or stopped): " + req.cacheName())); } @@ -1593,41 +1581,49 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (req.isStart()) { CacheConfiguration ccfg = req.startCacheConfiguration(); + DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get( + maskNull(ccfg.getName())); + // Check if cache with the same name was concurrently started form different node. if (desc != null) { - // If local node initiated start, fail the start future. - DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(ccfg.getName())); + if (!req.clientStartOnly()) { + // If local node initiated start, fail the start future. + if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) { + startFut.onDone(new IgniteCheckedException("Failed to start cache " + + "(a cache with the same name is already started): " + ccfg.getName())); + } - if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) { - startFut.onDone(new IgniteCheckedException("Failed to start cache " + - "(a cache with the same name is already started): " + ccfg.getName())); + return; } + } + else { + if (req.clientStartOnly()) { + if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) { + startFut.onDone(new IgniteCheckedException("Failed to start client cache " + + "(a cache with the given name is not started): " + ccfg.getName())); + } - return; + return; + } } - DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ccfg, req.deploymentId()); + if (!req.clientStartOnly()) { + DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ccfg, req.deploymentId()); - DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc); + DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc); - ctx.discovery().setCacheFilter( - ccfg.getName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode() == LOCAL); + assert old == null : + "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; - assert old == null : - "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']'; - } - else if (req.isClientStart()) { - if (desc != null) { - if (req.nearCacheCfg() != null) - ctx.discovery().addNearNode(req.cacheName(), req.clientNodeId()); - } - else { - if (log.isDebugEnabled()) - log.debug("Will not start client cache since cache is not registered: " + req.cacheName()); + ctx.discovery().setCacheFilter( + ccfg.getName(), + ccfg.getNodeFilter(), + ccfg.getNearConfiguration() != null, + ccfg.getCacheMode() == LOCAL); } + + if (req.nearCacheConfiguration() != null) + ctx.discovery().addNearNode(req.cacheName(), req.initiatingNodeId()); } else { if (desc == null) { @@ -2082,11 +2078,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (desc == null || desc.cancelled()) throw new IllegalArgumentException("Cache is not started: " + name); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId(), - desc.cacheConfiguration(), null); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ctx.localNodeId()); + + req.cacheName(name); req.deploymentId(desc.deploymentId()); + req.startCacheConfiguration(desc.cacheConfiguration()); + + req.clientStartOnly(true); + F.first(initiateCacheChanges(F.asList(req))).get(); cache = (IgniteCache<K, V>)jCacheProxies.get(masked); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f68fa57a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e09ad2d..a5bd7b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -577,7 +577,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void startCaches() throws IgniteCheckedException { for (DynamicCacheChangeRequest req : reqs) { - if (req.isStart() || req.isClientStart()) + if (req.isStart()) cctx.cache().prepareCacheStart(req); } } @@ -690,10 +690,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { if (F.eq(cacheCtx.name(), req.cacheName())) { - if (req.isStart()) - cacheCtx.preloader().onInitialExchangeComplete(err); - else if (req.isClientStart()) { - if (req.clientNodeId().equals(cacheCtx.localNodeId())) + if (req.isStart()) { + if (!req.clientStartOnly() || cacheCtx.localNodeId().equals(req.initiatingNodeId())) cacheCtx.preloader().onInitialExchangeComplete(err); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f68fa57a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 6ea7cb4..60355f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -432,7 +432,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testClientCache() throws Exception { + public void testClientCache() throws Exception { try { testAttribute = false; @@ -471,4 +471,91 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { stopGrid(nodeCount()); } } + + /** + * @throws Exception If failed. + */ + public void testStartFromClientNode() throws Exception { + try { + testAttribute = false; + + startGrid(nodeCount()); + + final IgniteKernal kernal = (IgniteKernal)grid(0); + + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + ccfg.setName(DYNAMIC_CACHE_NAME); + + ccfg.setNodeFilter(NODE_FILTER); + + final IgniteKernal started = (IgniteKernal)grid(nodeCount()); + + started.context().cache().dynamicStartCache(ccfg, null).get(); + + GridCacheAdapter<Object, Object> cache = started.internalCache(DYNAMIC_CACHE_NAME); + + assertNotNull(cache); + assertFalse(cache.context().affinityNode()); + + // Should obtain client cache on new node. + IgniteCache<Object, Object> clientCache = ignite(nodeCount()).jcache(DYNAMIC_CACHE_NAME); + + clientCache.put("1", "1"); + + for (int g = 0; g < nodeCount() + 1; g++) + assertEquals("1", ignite(g).jcache(DYNAMIC_CACHE_NAME).get("1")); + + kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get(); + } + finally { + stopGrid(nodeCount()); + } + } + + /** + * @throws Exception If failed. + */ + public void testStartNearCacheFromClientNode() throws Exception { + try { + testAttribute = false; + + startGrid(nodeCount()); + + final IgniteKernal kernal = (IgniteKernal)grid(0); + + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + ccfg.setName(DYNAMIC_CACHE_NAME); + + ccfg.setNodeFilter(NODE_FILTER); + + final IgniteKernal started = (IgniteKernal)grid(nodeCount()); + + NearCacheConfiguration nearCfg = new NearCacheConfiguration(); + + started.context().cache().dynamicStartCache(ccfg, nearCfg).get(); + + GridCacheAdapter<Object, Object> cache = started.internalCache(DYNAMIC_CACHE_NAME); + + assertNotNull(cache); + assertFalse(cache.context().affinityNode()); + assertTrue(cache.context().isNear()); + + // Should obtain client cache on new node. + IgniteCache<Object, Object> clientCache = ignite(nodeCount()).jcache(DYNAMIC_CACHE_NAME); + + clientCache.put("1", "1"); + + for (int g = 0; g < nodeCount() + 1; g++) + assertEquals("1", ignite(g).jcache(DYNAMIC_CACHE_NAME).get("1")); + + kernal.context().cache().dynamicStopCache(DYNAMIC_CACHE_NAME).get(); + } + finally { + stopGrid(nodeCount()); + } + } }