IGNITE-3440: Ignite Services: ServiceTopologyCallable is executed before system cache is started (cherry picked from commit 06b24a9)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c7eed885 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c7eed885 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c7eed885 Branch: refs/heads/ignite-1232-1 Commit: c7eed8859202b99cc64b7f6d6efe46acf71ad72a Parents: cd1d618 Author: Denis Magda <[email protected]> Authored: Wed Jul 13 14:19:51 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Wed Jul 13 15:28:50 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 2 + .../service/GridServiceProcessor.java | 70 +++++++++++++++++++- 2 files changed, 71 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c7eed885/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 9451254..e104b87 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -821,6 +821,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!ctx.config().isDaemon()) ctx.cacheObjects().onUtilityCacheStarted(); + ctx.service().onUtilityCacheStarted(); + // Wait for caches in SYNC preload mode. for (DynamicCacheDescriptor desc : registeredCaches.values()) { CacheConfiguration cfg = desc.cacheConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c7eed885/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index fd895ea..01b7302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -38,8 +38,10 @@ import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -86,6 +88,8 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.JobContextResource; +import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; @@ -137,6 +141,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Deployment futures. */ private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap8<>(); + /** Pending compute job contexts that waiting for utility cache initialization. */ + private final List<ComputeJobContext> pendingJobCtxs = new ArrayList<>(0); + /** Deployment executor service. */ private final ExecutorService depExe; @@ -1325,6 +1332,23 @@ public class GridServiceProcessor extends GridProcessorAdapter { } /** + * Called right after utility cache is started and ready for the usage. + */ + public void onUtilityCacheStarted() { + synchronized (pendingJobCtxs) { + if (pendingJobCtxs.size() == 0) + return; + + Iterator<ComputeJobContext> iter = pendingJobCtxs.iterator(); + + while (iter.hasNext()) { + iter.next().callcc(); + iter.remove(); + } + } + } + + /** * Service deployment listener. */ @SuppressWarnings("unchecked") @@ -1809,9 +1833,20 @@ public class GridServiceProcessor extends GridProcessorAdapter { private final String svcName; /** */ + private boolean waitedCacheInit; + + /** */ @IgniteInstanceResource private IgniteEx ignite; + /** */ + @JobContextResource + private ComputeJobContext jCtx; + + /** */ + @LoggerResource + private IgniteLogger log; + /** * @param svcName Service name. */ @@ -1821,7 +1856,40 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public Map<UUID, Integer> call() throws Exception { - return serviceTopology(ignite.context().cache().utilityCache(), svcName); + IgniteInternalCache<Object, Object> cache = ignite.context().cache().utilityCache(); + + if (cache == null) { + List<ComputeJobContext> pendingCtxs = ignite.context().service().pendingJobCtxs; + + synchronized (pendingCtxs) { + // Double check cache reference after lock acqusition. + cache = ignite.context().cache().utilityCache(); + + if (cache == null) { + if (!waitedCacheInit) { + log.debug("Utility cache hasn't been initialized yet. Waiting."); + + // waiting for a minute for cache initialization. + jCtx.holdcc(60 * 1000); + + pendingCtxs.add(jCtx); + + waitedCacheInit = true; + + return null; + } + else { + log.error("Failed to gather service topology. Utility " + + "cache initialization is stuck."); + + throw new IgniteCheckedException("Failed to gather service topology. Utility " + + "cache initialization is stuck."); + } + } + } + } + + return serviceTopology(cache, svcName); } }
