Corrected ServiceProcessor shutdown sequence

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d0b0765
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d0b0765
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d0b0765

Branch: refs/heads/ignite-5293
Commit: 1d0b0765134a81e6626a9ef1c70939085f954847
Parents: 31ac7e9a
Author: Konstantin Dudkov <[email protected]>
Authored: Mon Jun 5 11:06:44 2017 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Mon Jun 5 11:06:44 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           | 86 +++++++++-----------
 1 file changed, 39 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0b0765/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index c2fee02..fd8141c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -281,56 +281,52 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
         if (ctx.isDaemon())
             return;
 
+        // Will not release it.
         busyLock.block();
 
-        try {
-            U.shutdownNow(GridServiceProcessor.class, depExe, log);
+        U.shutdownNow(GridServiceProcessor.class, depExe, log);
 
-            if (!ctx.clientNode())
-                ctx.event().removeDiscoveryEventListener(topLsnr);
+        if (!ctx.clientNode())
+            ctx.event().removeDiscoveryEventListener(topLsnr);
 
-            Collection<ServiceContextImpl> ctxs = new ArrayList<>();
+        Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
-            synchronized (locSvcs) {
-                for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
-                    ctxs.addAll(ctxs0);
-            }
+        synchronized (locSvcs) {
+            for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
+                ctxs.addAll(ctxs0);
+        }
 
-            for (ServiceContextImpl ctx : ctxs) {
-                ctx.setCancelled(true);
+        for (ServiceContextImpl ctx : ctxs) {
+            ctx.setCancelled(true);
 
-                Service svc = ctx.service();
+            Service svc = ctx.service();
 
-                if (svc != null)
-                    svc.cancel(ctx);
+            if (svc != null)
+                svc.cancel(ctx);
 
-                ctx.executor().shutdownNow();
-            }
+            ctx.executor().shutdownNow();
+        }
 
-            for (ServiceContextImpl ctx : ctxs) {
-                try {
-                    if (log.isInfoEnabled() && !ctxs.isEmpty())
-                        log.info("Shutting down distributed service [name=" + 
ctx.name() + ", execId8=" +
-                            U.id8(ctx.executionId()) + ']');
+        for (ServiceContextImpl ctx : ctxs) {
+            try {
+                if (log.isInfoEnabled() && !ctxs.isEmpty())
+                    log.info("Shutting down distributed service [name=" + 
ctx.name() + ", execId8=" +
+                        U.id8(ctx.executionId()) + ']');
 
-                    ctx.executor().awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
-                }
-                catch (InterruptedException ignore) {
-                    Thread.currentThread().interrupt();
+                ctx.executor().awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException ignore) {
+                Thread.currentThread().interrupt();
 
-                    U.error(log, "Got interrupted while waiting for service to 
shutdown (will continue stopping node): " +
-                        ctx.name());
-                }
+                U.error(log, "Got interrupted while waiting for service to 
shutdown (will continue stopping node): " +
+                    ctx.name());
             }
+        }
 
-            Exception err = new IgniteCheckedException("Operation has been 
cancelled (node is stopping).");
+        Exception err = new IgniteCheckedException("Operation has been 
cancelled (node is stopping).");
 
-            cancelFutures(depFuts, err);
-            cancelFutures(undepFuts, err);
-        }
-        finally {
-            busyLock.unblock();
-        }
+        cancelFutures(depFuts, err);
+        cancelFutures(undepFuts, err);
 
         if (log.isDebugEnabled())
             log.debug("Stopped service processor.");
@@ -462,10 +458,11 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
      * @param name Service name.
      * @param svc Service.
      * @param cacheName Cache name.
-     * @param  affKey Affinity key.
+     * @param affKey Affinity key.
      * @return Future.
      */
-    public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, 
Service svc, String cacheName, Object affKey) {
+    public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, 
Service svc, String cacheName,
+        Object affKey) {
         A.notNull(affKey, "affKey");
 
         ServiceConfiguration cfg = new ServiceConfiguration();
@@ -878,7 +875,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
         Collection<ServiceContextImpl> ctxs;
 
         synchronized (locSvcs) {
-             ctxs = locSvcs.get(name);
+            ctxs = locSvcs.get(name);
         }
 
         if (ctxs == null)
@@ -921,9 +918,9 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
         while (true) {
             GridServiceAssignments assigns = new GridServiceAssignments(cfg, 
dep.nodeId(), topVer.topologyVersion());
 
-             Collection<ClusterNode> nodes;
+            Collection<ClusterNode> nodes;
 
-             // Call node filter outside of transaction.
+            // Call node filter outside of transaction.
             if (affKey == null) {
                 nodes = ctx.discovery().nodes(topVer);
 
@@ -1212,7 +1209,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
      * @param cancelCnt Number of contexts to cancel.
      */
     private void cancel(Iterable<ServiceContextImpl> ctxs, int cancelCnt) {
-        for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext();) 
{
+        for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext(); 
) {
             ServiceContextImpl svcCtx = it.next();
 
             // Flip cancelled flag.
@@ -1285,7 +1282,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
             GridCloseableIterator<Map.Entry<Object, Object>> iter = 
qry.executeScanQuery();
 
             return cache.context().itHolder().iterator(iter,
-                new CacheIteratorConverter<Cache.Entry<Object, Object>, 
Map.Entry<Object,Object>>() {
+                new CacheIteratorConverter<Cache.Entry<Object, Object>, 
Map.Entry<Object, Object>>() {
                     @Override protected Cache.Entry<Object, Object> 
convert(Map.Entry<Object, Object> e) {
                         return new CacheEntryImpl<>(e.getKey(), e.getValue());
                     }
@@ -1510,10 +1507,6 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
             if (!busyLock.enterBusy())
                 return;
 
-            //Must check that threadpool was not shutdown.
-            if (depExe.isShutdown())
-                return;
-
             try {
                 final AffinityTopologyVersion topVer;
 
@@ -1726,7 +1719,6 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
             undeploy(e.getKey().name());
     }
 
-
     /**
      * @param name Name.
      */

Reply via email to