IGNITE-3440: Ignite Services: ServiceTopologyCallable is executed before system cache is started
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/06b24a9b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/06b24a9b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/06b24a9b Branch: refs/heads/ignite-3414 Commit: 06b24a9b3952598604e02d80d1ed76a52d85e743 Parents: 89d64e7 Author: Denis Magda <[email protected]> Authored: Wed Jul 13 14:19:51 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Wed Jul 13 14:19:51 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 2 + .../service/GridServiceProcessor.java | 70 +++++++++++++++++++- 2 files changed, 71 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/06b24a9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 414a915..6484d4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -797,6 +797,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!ctx.config().isDaemon()) ctx.cacheObjects().onUtilityCacheStarted(); + ctx.service().onUtilityCacheStarted(); + // Wait for caches in SYNC preload mode. for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { GridCacheAdapter cache = caches.get(maskNull(cfg.getName())); http://git-wip-us.apache.org/repos/asf/ignite/blob/06b24a9b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 53eaeb5..b418ba2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -38,8 +38,10 @@ import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -81,6 +83,8 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.JobContextResource; +import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; @@ -125,6 +129,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Deployment futures. */ private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap8<>(); + /** Pending compute job contexts that waiting for utility cache initialization. */ + private final List<ComputeJobContext> pendingJobCtxs = new ArrayList<>(0); + /** Deployment executor service. */ private final ExecutorService depExe; @@ -1304,6 +1311,23 @@ public class GridServiceProcessor extends GridProcessorAdapter { } /** + * Called right after utility cache is started and ready for the usage. + */ + public void onUtilityCacheStarted() { + synchronized (pendingJobCtxs) { + if (pendingJobCtxs.size() == 0) + return; + + Iterator<ComputeJobContext> iter = pendingJobCtxs.iterator(); + + while (iter.hasNext()) { + iter.next().callcc(); + iter.remove(); + } + } + } + + /** * Service deployment listener. */ @SuppressWarnings("unchecked") @@ -1783,9 +1807,20 @@ public class GridServiceProcessor extends GridProcessorAdapter { private final String svcName; /** */ + private boolean waitedCacheInit; + + /** */ @IgniteInstanceResource private IgniteEx ignite; + /** */ + @JobContextResource + private ComputeJobContext jCtx; + + /** */ + @LoggerResource + private IgniteLogger log; + /** * @param svcName Service name. */ @@ -1795,7 +1830,40 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public Map<UUID, Integer> call() throws Exception { - return serviceTopology(ignite.context().cache().utilityCache(), svcName); + IgniteInternalCache<Object, Object> cache = ignite.context().cache().utilityCache(); + + if (cache == null) { + List<ComputeJobContext> pendingCtxs = ignite.context().service().pendingJobCtxs; + + synchronized (pendingCtxs) { + // Double check cache reference after lock acqusition. + cache = ignite.context().cache().utilityCache(); + + if (cache == null) { + if (!waitedCacheInit) { + log.debug("Utility cache hasn't been initialized yet. Waiting."); + + // waiting for a minute for cache initialization. + jCtx.holdcc(60 * 1000); + + pendingCtxs.add(jCtx); + + waitedCacheInit = true; + + return null; + } + else { + log.error("Failed to gather service topology. Utility " + + "cache initialization is stuck."); + + throw new IgniteCheckedException("Failed to gather service topology. Utility " + + "cache initialization is stuck."); + } + } + } + } + + return serviceTopology(cache, svcName); } }
