Repository: ignite
Updated Branches:
  refs/heads/master d27765afb -> 5181f6b53


IGNITE-5975 Fixed hang on concurrent node stop and continuous query registration


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5181f6b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5181f6b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5181f6b5

Branch: refs/heads/master
Commit: 5181f6b53a1753d4e4c9714d66fb23cdc21f0e35
Parents: d27765a
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
Authored: Thu Apr 5 11:18:07 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Thu Apr 5 11:18:07 2018 +0300

----------------------------------------------------------------------
 .../ignite/internal/NodeStoppingException.java  |   7 ++
 .../continuous/CacheContinuousQueryManager.java |  21 ++--
 .../continuous/GridContinuousProcessor.java     | 124 ++++++++++---------
 ...eAbstractDataStructuresFailoverSelfTest.java |  19 ++-
 4 files changed, 103 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
index 164983a..75447a1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
@@ -27,6 +27,13 @@ public class NodeStoppingException extends 
IgniteCheckedException {
     private static final long serialVersionUID = 0L;
 
     /**
+     * @param cause Original node stopping cause.
+     */
+    public NodeStoppingException(NodeStoppingException cause) {
+        super(cause);
+    }
+
+    /**
      * @param msg Exception message.
      */
     public NodeStoppingException(String msg) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index f070686..19225f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -52,6 +52,7 @@ import org.apache.ignite.cache.query.ContinuousQuery;
 import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -681,18 +682,24 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
 
         assert pred != null : cctx.config();
 
-        UUID id = cctx.kernalContext().continuous().startRoutine(
-            hnd,
-            internal && loc,
-            bufSize,
-            timeInterval,
-            autoUnsubscribe,
-            pred).get();
+        UUID id = null;
 
         try {
+            id = cctx.kernalContext().continuous().startRoutine(
+                hnd,
+                internal && loc,
+                bufSize,
+                timeInterval,
+                autoUnsubscribe,
+                pred).get();
+
             if (hnd.isQuery() && cctx.userCache() && !onStart)
                 hnd.waitTopologyFuture(cctx.kernalContext());
         }
+        catch (NodeStoppingException e) {
+            // Wrap original exception to show the source of continuous query 
start stacktrace.
+            throw new NodeStoppingException(e);
+        }
         catch (IgniteCheckedException e) {
             log.warning("Failed to start continuous query.", e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 026894d..01a5a71 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -313,6 +314,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         finally {
             processorStopLock.writeLock().unlock();
         }
+
+        cancelFutures(new NodeStoppingException("Failed to start continuous 
query (node is stopping)"));
     }
 
     /** {@inheritDoc} */
@@ -561,14 +564,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param rmtFilter Remote filter.
      * @param prjPred Projection predicate.
      * @return Routine ID.
-     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
     public UUID registerStaticRoutine(
         String cacheName,
         CacheEntryUpdatedListener<?, ?> locLsnr,
         CacheEntryEventSerializableFilter rmtFilter,
-        @Nullable IgnitePredicate<ClusterNode> prjPred) throws 
IgniteCheckedException {
+        @Nullable IgnitePredicate<ClusterNode> prjPred) {
         String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
 
         CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
@@ -668,32 +670,40 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         // Register per-routine notifications listener if ordered messaging is 
used.
         registerMessageListener(hnd);
 
-        StartFuture fut = new StartFuture(ctx, routineId);
-
-        startFuts.put(routineId, fut);
+        if (!lockStopping())
+            return new GridFinishedFuture<>(new NodeStoppingException("Failed 
to start continuous query (node is stopping)"));
 
         try {
-            if (locIncluded || hnd.isQuery())
-                registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, 
interval, autoUnsubscribe, true);
+            StartFuture fut = new StartFuture(ctx, routineId);
 
-            ctx.discovery().sendCustomEvent(new 
StartRoutineDiscoveryMessage(routineId, reqData,
-                reqData.handler().keepBinary()));
-        }
-        catch (IgniteCheckedException e) {
-            startFuts.remove(routineId);
-            locInfos.remove(routineId);
+            startFuts.put(routineId, fut);
 
-            unregisterHandler(routineId, hnd, true);
+            try {
+                if (locIncluded || hnd.isQuery())
+                    registerHandler(ctx.localNodeId(), routineId, hnd, 
bufSize, interval, autoUnsubscribe, true);
 
-            fut.onDone(e);
+                ctx.discovery().sendCustomEvent(new 
StartRoutineDiscoveryMessage(routineId, reqData,
+                    reqData.handler().keepBinary()));
+            }
+            catch (IgniteCheckedException e) {
+                startFuts.remove(routineId);
+                locInfos.remove(routineId);
 
-            return fut;
-        }
+                unregisterHandler(routineId, hnd, true);
+
+                fut.onDone(e);
+
+                return fut;
+            }
 
-        // Handler is registered locally.
-        fut.onLocalRegistered();
+            // Handler is registered locally.
+            fut.onLocalRegistered();
 
-        return fut;
+            return fut;
+        }
+        finally {
+            unlockStopping();
+        }
     }
 
     /**
@@ -734,46 +744,54 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         boolean doStop = false;
 
-        StopFuture fut = stopFuts.get(routineId);
+        if (!lockStopping())
+            return new GridFinishedFuture<>(new NodeStoppingException("Failed 
to stop continuous query (node is stopping)"));
 
-        // Only one thread will stop routine with provided ID.
-        if (fut == null) {
-            StopFuture old = stopFuts.putIfAbsent(routineId, fut = new 
StopFuture(ctx));
+        try {
+            StopFuture fut = stopFuts.get(routineId);
 
-            if (old != null)
-                fut = old;
-            else
-                doStop = true;
-        }
+            // Only one thread will stop routine with provided ID.
+            if (fut == null) {
+                StopFuture old = stopFuts.putIfAbsent(routineId, fut = new 
StopFuture(ctx));
 
-        if (doStop) {
-            // Unregister routine locally.
-            LocalRoutineInfo routine = locInfos.remove(routineId);
+                if (old != null)
+                    fut = old;
+                else
+                    doStop = true;
+            }
 
-            // Finish if routine is not found (wrong ID is provided).
-            if (routine == null) {
-                stopFuts.remove(routineId);
+            if (doStop) {
+                // Unregister routine locally.
+                LocalRoutineInfo routine = locInfos.remove(routineId);
 
-                fut.onDone();
+                // Finish if routine is not found (wrong ID is provided).
+                if (routine == null) {
+                    stopFuts.remove(routineId);
 
-                return fut;
-            }
+                    fut.onDone();
 
-            // Unregister handler locally.
-            unregisterHandler(routineId, routine.hnd, true);
+                    return fut;
+                }
 
-            try {
-                ctx.discovery().sendCustomEvent(new 
StopRoutineDiscoveryMessage(routineId));
-            }
-            catch (IgniteCheckedException e) {
-                fut.onDone(e);
+                // Unregister handler locally.
+                unregisterHandler(routineId, routine.hnd, true);
+
+                try {
+                    ctx.discovery().sendCustomEvent(new 
StopRoutineDiscoveryMessage(routineId));
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onDone(e);
+                }
+
+                if (ctx.isStopping())
+                    fut.onDone();
             }
 
-            if (ctx.isStopping())
-                fut.onDone();
+            return fut;
+        }
+        finally {
+            unlockStopping();
         }
-
-        return fut;
     }
 
     /**
@@ -883,7 +901,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws 
IgniteCheckedException {
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
         cancelFutures(new 
IgniteClientDisconnectedCheckedException(reconnectFut, "Client node 
disconnected."));
 
         if (log.isDebugEnabled()) {
@@ -1951,9 +1969,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * Future for stop routine.
      */
     private static class StopFuture extends GridFutureAdapter<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
         /** Timeout object. */
         private volatile GridTimeoutObject timeoutObj;
 
@@ -1997,9 +2012,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      */
     private static class SyncMessageAckFuture extends 
GridFutureAdapter<Object> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private UUID nodeId;
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 826c1a9..9f9e577 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -26,8 +26,10 @@ import java.util.UUID;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
@@ -539,21 +541,24 @@ public abstract class 
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testSemaphoreSingleNodeFailure() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-5975";);
-
         final Ignite i1 = grid(0);
 
         IgniteSemaphore sem1 = i1.semaphore(STRUCTURE_NAME, 1, false, true);
 
         sem1.acquire();
 
+        final CountDownLatch createLatch = new CountDownLatch(1);
+
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
-            @Override public Void call() throws Exception {
+            @Override public Void call() {
                 boolean failed = true;
 
                 IgniteSemaphore sem2 = i1.semaphore(STRUCTURE_NAME, 1, false, 
true);
 
                 try {
+                    // Guard the acquire call by count down latch to make sure 
that semaphore creation does not fail.
+                    createLatch.countDown();
+
                     sem2.acquire();
                 }
                 catch (Exception ignored){
@@ -568,10 +573,14 @@ public abstract class 
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             }
         });
 
-        while(!sem1.hasQueuedThreads()){
+        assertTrue("Failed to wait for semaphore creation",
+            createLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+
+        while(!sem1.hasQueuedThreads()) {
             try {
                 Thread.sleep(1);
-            } catch (InterruptedException ignored) {
+            }
+            catch (InterruptedException ignored) {
                 fail();
             }
         }

Reply via email to