Corrected ServiceProcessor shutdown sequence
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d0b0765 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d0b0765 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d0b0765 Branch: refs/heads/ignite-5293 Commit: 1d0b0765134a81e6626a9ef1c70939085f954847 Parents: 31ac7e9a Author: Konstantin Dudkov <[email protected]> Authored: Mon Jun 5 11:06:44 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Jun 5 11:06:44 2017 +0300 ---------------------------------------------------------------------- .../service/GridServiceProcessor.java | 86 +++++++++----------- 1 file changed, 39 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0b0765/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index c2fee02..fd8141c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -281,56 +281,52 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite if (ctx.isDaemon()) return; + // Will not release it. busyLock.block(); - try { - U.shutdownNow(GridServiceProcessor.class, depExe, log); + U.shutdownNow(GridServiceProcessor.class, depExe, log); - if (!ctx.clientNode()) - ctx.event().removeDiscoveryEventListener(topLsnr); + if (!ctx.clientNode()) + ctx.event().removeDiscoveryEventListener(topLsnr); - Collection<ServiceContextImpl> ctxs = new ArrayList<>(); + Collection<ServiceContextImpl> ctxs = new ArrayList<>(); - synchronized (locSvcs) { - for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values()) - ctxs.addAll(ctxs0); - } + synchronized (locSvcs) { + for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values()) + ctxs.addAll(ctxs0); + } - for (ServiceContextImpl ctx : ctxs) { - ctx.setCancelled(true); + for (ServiceContextImpl ctx : ctxs) { + ctx.setCancelled(true); - Service svc = ctx.service(); + Service svc = ctx.service(); - if (svc != null) - svc.cancel(ctx); + if (svc != null) + svc.cancel(ctx); - ctx.executor().shutdownNow(); - } + ctx.executor().shutdownNow(); + } - for (ServiceContextImpl ctx : ctxs) { - try { - if (log.isInfoEnabled() && !ctxs.isEmpty()) - log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" + - U.id8(ctx.executionId()) + ']'); + for (ServiceContextImpl ctx : ctxs) { + try { + if (log.isInfoEnabled() && !ctxs.isEmpty()) + log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" + + U.id8(ctx.executionId()) + ']'); - ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); + ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); - U.error(log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " + - ctx.name()); - } + U.error(log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " + + ctx.name()); } + } - Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping)."); + Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping)."); - cancelFutures(depFuts, err); - cancelFutures(undepFuts, err); - } - finally { - busyLock.unblock(); - } + cancelFutures(depFuts, err); + cancelFutures(undepFuts, err); if (log.isDebugEnabled()) log.debug("Stopped service processor."); @@ -462,10 +458,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite * @param name Service name. * @param svc Service. * @param cacheName Cache name. - * @param affKey Affinity key. + * @param affKey Affinity key. * @return Future. */ - public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service svc, String cacheName, Object affKey) { + public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service svc, String cacheName, + Object affKey) { A.notNull(affKey, "affKey"); ServiceConfiguration cfg = new ServiceConfiguration(); @@ -878,7 +875,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite Collection<ServiceContextImpl> ctxs; synchronized (locSvcs) { - ctxs = locSvcs.get(name); + ctxs = locSvcs.get(name); } if (ctxs == null) @@ -921,9 +918,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite while (true) { GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion()); - Collection<ClusterNode> nodes; + Collection<ClusterNode> nodes; - // Call node filter outside of transaction. + // Call node filter outside of transaction. if (affKey == null) { nodes = ctx.discovery().nodes(topVer); @@ -1212,7 +1209,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite * @param cancelCnt Number of contexts to cancel. */ private void cancel(Iterable<ServiceContextImpl> ctxs, int cancelCnt) { - for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext();) { + for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext(); ) { ServiceContextImpl svcCtx = it.next(); // Flip cancelled flag. @@ -1285,7 +1282,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite GridCloseableIterator<Map.Entry<Object, Object>> iter = qry.executeScanQuery(); return cache.context().itHolder().iterator(iter, - new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object,Object>>() { + new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object, Object>>() { @Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object, Object> e) { return new CacheEntryImpl<>(e.getKey(), e.getValue()); } @@ -1510,10 +1507,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite if (!busyLock.enterBusy()) return; - //Must check that threadpool was not shutdown. - if (depExe.isShutdown()) - return; - try { final AffinityTopologyVersion topVer; @@ -1726,7 +1719,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite undeploy(e.getKey().name()); } - /** * @param name Name. */
