Repository: incubator-ignite Updated Branches: refs/heads/ignite-709_3 fb1d79cf1 -> 64ed3f19c
# ignite-709_3 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64ed3f19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64ed3f19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64ed3f19 Branch: refs/heads/ignite-709_3 Commit: 64ed3f19c1a3c65b4d47f806a212c52bfabb3dae Parents: fb1d79c Author: sboikov <[email protected]> Authored: Thu May 14 21:55:51 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu May 14 21:55:51 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 144 ++++++++++++------- .../preloader/GridDhtPartitionDemandPool.java | 2 +- .../datastructures/DataStructuresProcessor.java | 2 +- 3 files changed, 91 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 9319b45..331e454 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Count down latch for caches. */ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); + /** */ + private final GridFutureAdapter<Object> sysCacheStartFut = new GridFutureAdapter<>(); + /** * @param ctx Kernal context. */ @@ -661,6 +664,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { + DynamicCacheDescriptor marshCacheDesc = null; + DynamicCacheDescriptor utilityCacheDesc = null; + DynamicCacheDescriptor atomicsCacheDesc = null; + try { if (ctx.config().isDaemon()) return; @@ -727,17 +734,34 @@ public class GridCacheProcessor extends GridProcessorAdapter { jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); } + + if (CU.MARSH_CACHE_NAME.equals(ccfg.getName())) + marshCacheDesc = desc; + else if (CU.UTILITY_CACHE_NAME.equals(ccfg.getName())) + utilityCacheDesc = desc; + else if (CU.ATOMICS_CACHE_NAME.equals(ccfg.getName())) + atomicsCacheDesc = desc; } } finally { cacheStartedLatch.countDown(); } - if (marshallerCache() == null) { - IgniteInternalFuture<?> fut = marshallerCacheAsync(); + if (ctx.config().isClientMode()) { + assert marshCacheDesc != null; + assert utilityCacheDesc != null; + assert atomicsCacheDesc != null; + + Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(); - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { + reqs.add(clientSystemCacheRequest(marshCacheDesc, new NearCacheConfiguration())); + reqs.add(clientSystemCacheRequest(utilityCacheDesc, null)); + reqs.add(clientSystemCacheRequest(atomicsCacheDesc, new NearCacheConfiguration())); + + startClientSystemCaches(reqs); + + sysCacheStartFut.listen(new CI1<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> fut) { try { marshallerCacheCallbacks(); } @@ -747,8 +771,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { } }); } - else + else { + sysCacheStartFut.onDone(); + marshallerCacheCallbacks(); + } // Must call onKernalStart on shared managers after creation of fetched caches. for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) @@ -800,6 +827,51 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param reqs Start requests. + */ + private void startClientSystemCaches(Collection<DynamicCacheChangeRequest> reqs) { + assert !F.isEmpty(reqs) : reqs; + + GridCompoundFuture<Object, Object> fut = new GridCompoundFuture<>(); + + for (DynamicCacheStartFuture startFut : initiateCacheChanges(reqs)) + fut.add(startFut); + + fut.markInitialized(); + + fut.listen(new CI1<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> fut) { + sysCacheStartFut.onDone(); + } + }); + } + + /** + * @param cacheDesc Cache descriptor. + * @return Cache change request. + */ + private DynamicCacheChangeRequest clientSystemCacheRequest( + DynamicCacheDescriptor cacheDesc, + @Nullable NearCacheConfiguration nearCfg) + { + DynamicCacheChangeRequest desc = new DynamicCacheChangeRequest( + cacheDesc.cacheConfiguration().getName(), + ctx.localNodeId()); + + desc.clientStartOnly(true); + + desc.nearCacheConfiguration(nearCfg); + + desc.deploymentId(cacheDesc.deploymentId()); + + desc.startCacheConfiguration(cacheDesc.cacheConfiguration()); + + desc.cacheType(cacheDesc.cacheType()); + + return desc; + } + + /** * @throws IgniteCheckedException If failed. */ private void marshallerCacheCallbacks() throws IgniteCheckedException { @@ -856,6 +928,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { @Override public void onKernalStop(boolean cancel) { cacheStartedLatch.countDown(); + sysCacheStartFut.onDone(); + if (ctx.config().isDaemon()) return; @@ -2534,7 +2608,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert ctx.config().isClientMode() : "Utility cache is missed on server node."; - getOrStartCache(CU.UTILITY_CACHE_NAME); + sysCacheStartFut.get(); return internalCache(CU.UTILITY_CACHE_NAME); } @@ -2542,24 +2616,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @return Utility cache start future. */ - public IgniteInternalFuture<?> utilityCacheAsync() { - if (internalCache(CU.UTILITY_CACHE_NAME) != null) - return new GridFinishedFuture<>(); - - return startCacheAsync(CU.UTILITY_CACHE_NAME, null, true); - } - - /** - * @return Utility cache start future. - */ - public IgniteInternalFuture<?> marshallerCacheAsync() { - if (internalCache(CU.MARSH_CACHE_NAME) != null) - return new GridFinishedFuture<>(); - - assert ctx.config().isClientMode() : "Marshaller cache is missed on server node."; - - // On client node use near-only marshaller cache. - return startCacheAsync(CU.MARSH_CACHE_NAME, new NearCacheConfiguration(), false); + public IgniteInternalFuture<?> systemCachesStartFuture() { + return sysCacheStartFut; } /** @@ -2639,35 +2697,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { - IgniteInternalFuture<?> fut = startCacheAsync(cacheName, null, failIfNotStarted); - - if (fut != null) { - fut.get(); - - String masked = maskNull(cacheName); - - IgniteCache cache = jCacheProxies.get(masked); - - if (cache == null && failIfNotStarted) - throw new IllegalArgumentException("Cache is not started: " + cacheName); - - return cache; - } - - return null; - } - - /** - * @param cacheName Cache name. - * @param nearCfg Near cache configuration. - * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started, - * otherwise returns {@code null} in this case. - * @return Future. - */ - @Nullable public IgniteInternalFuture<?> startCacheAsync(String cacheName, - @Nullable NearCacheConfiguration nearCfg, - boolean failIfNotStarted) - { String masked = maskNull(cacheName); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -2695,9 +2724,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.clientStartOnly(true); - req.nearCacheConfiguration(nearCfg); + F.first(initiateCacheChanges(F.asList(req))).get(); - return F.first(initiateCacheChanges(F.asList(req))); + IgniteCache cache = jCacheProxies.get(masked); + + if (cache == null && failIfNotStarted) + throw new IllegalArgumentException("Cache is not started: " + cacheName); + + return cache; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index ba2af9e..4153c5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -824,7 +824,7 @@ public class GridDhtPartitionDemandPool<K, V> { log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']'); try { - cctx.kernalContext().cache().marshallerCacheAsync().get(); + cctx.kernalContext().cache().systemCachesStartFuture().get(); cctx.kernalContext().cache().marshallerCache().context().awaitStarted(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 0a90e32..b6d4b40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -128,7 +128,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (atomicsCache == null) { assert ctx.config().isClientMode() : "Atomics cache is missed on server node."; - ctx.cache().startCacheAsync(CU.ATOMICS_CACHE_NAME, new NearCacheConfiguration(), true).get(); + ctx.cache().systemCachesStartFuture().get(); atomicsCache = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME); }
