Repository: ignite
Updated Branches:
refs/heads/master d27765afb -> 5181f6b53
Advertising
IGNITE-5975 Fixed hang on concurrent node stop and continuous query registration
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5181f6b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5181f6b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5181f6b5
Branch: refs/heads/master
Commit: 5181f6b53a1753d4e4c9714d66fb23cdc21f0e35
Parents: d27765a
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
Authored: Thu Apr 5 11:18:07 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Thu Apr 5 11:18:07 2018 +0300
----------------------------------------------------------------------
.../ignite/internal/NodeStoppingException.java | 7 ++
.../continuous/CacheContinuousQueryManager.java | 21 ++--
.../continuous/GridContinuousProcessor.java | 124 ++++++++++---------
...eAbstractDataStructuresFailoverSelfTest.java | 19 ++-
4 files changed, 103 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
index 164983a..75447a1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/NodeStoppingException.java
@@ -27,6 +27,13 @@ public class NodeStoppingException extends
IgniteCheckedException {
private static final long serialVersionUID = 0L;
/**
+ * @param cause Original node stopping cause.
+ */
+ public NodeStoppingException(NodeStoppingException cause) {
+ super(cause);
+ }
+
+ /**
* @param msg Exception message.
*/
public NodeStoppingException(String msg) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index f070686..19225f8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -52,6 +52,7 @@ import org.apache.ignite.cache.query.ContinuousQuery;
import
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -681,18 +682,24 @@ public class CacheContinuousQueryManager extends
GridCacheManagerAdapter {
assert pred != null : cctx.config();
- UUID id = cctx.kernalContext().continuous().startRoutine(
- hnd,
- internal && loc,
- bufSize,
- timeInterval,
- autoUnsubscribe,
- pred).get();
+ UUID id = null;
try {
+ id = cctx.kernalContext().continuous().startRoutine(
+ hnd,
+ internal && loc,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ pred).get();
+
if (hnd.isQuery() && cctx.userCache() && !onStart)
hnd.waitTopologyFuture(cctx.kernalContext());
}
+ catch (NodeStoppingException e) {
+ // Wrap original exception to show the source of continuous query
start stacktrace.
+ throw new NodeStoppingException(e);
+ }
catch (IgniteCheckedException e) {
log.warning("Failed to start continuous query.", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 026894d..01a5a71 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -49,6 +49,7 @@ import
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -313,6 +314,8 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
finally {
processorStopLock.writeLock().unlock();
}
+
+ cancelFutures(new NodeStoppingException("Failed to start continuous
query (node is stopping)"));
}
/** {@inheritDoc} */
@@ -561,14 +564,13 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
* @param rmtFilter Remote filter.
* @param prjPred Projection predicate.
* @return Routine ID.
- * @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
public UUID registerStaticRoutine(
String cacheName,
CacheEntryUpdatedListener<?, ?> locLsnr,
CacheEntryEventSerializableFilter rmtFilter,
- @Nullable IgnitePredicate<ClusterNode> prjPred) throws
IgniteCheckedException {
+ @Nullable IgnitePredicate<ClusterNode> prjPred) {
String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
@@ -668,32 +670,40 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
// Register per-routine notifications listener if ordered messaging is
used.
registerMessageListener(hnd);
- StartFuture fut = new StartFuture(ctx, routineId);
-
- startFuts.put(routineId, fut);
+ if (!lockStopping())
+ return new GridFinishedFuture<>(new NodeStoppingException("Failed
to start continuous query (node is stopping)"));
try {
- if (locIncluded || hnd.isQuery())
- registerHandler(ctx.localNodeId(), routineId, hnd, bufSize,
interval, autoUnsubscribe, true);
+ StartFuture fut = new StartFuture(ctx, routineId);
- ctx.discovery().sendCustomEvent(new
StartRoutineDiscoveryMessage(routineId, reqData,
- reqData.handler().keepBinary()));
- }
- catch (IgniteCheckedException e) {
- startFuts.remove(routineId);
- locInfos.remove(routineId);
+ startFuts.put(routineId, fut);
- unregisterHandler(routineId, hnd, true);
+ try {
+ if (locIncluded || hnd.isQuery())
+ registerHandler(ctx.localNodeId(), routineId, hnd,
bufSize, interval, autoUnsubscribe, true);
- fut.onDone(e);
+ ctx.discovery().sendCustomEvent(new
StartRoutineDiscoveryMessage(routineId, reqData,
+ reqData.handler().keepBinary()));
+ }
+ catch (IgniteCheckedException e) {
+ startFuts.remove(routineId);
+ locInfos.remove(routineId);
- return fut;
- }
+ unregisterHandler(routineId, hnd, true);
+
+ fut.onDone(e);
+
+ return fut;
+ }
- // Handler is registered locally.
- fut.onLocalRegistered();
+ // Handler is registered locally.
+ fut.onLocalRegistered();
- return fut;
+ return fut;
+ }
+ finally {
+ unlockStopping();
+ }
}
/**
@@ -734,46 +744,54 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
boolean doStop = false;
- StopFuture fut = stopFuts.get(routineId);
+ if (!lockStopping())
+ return new GridFinishedFuture<>(new NodeStoppingException("Failed
to stop continuous query (node is stopping)"));
- // Only one thread will stop routine with provided ID.
- if (fut == null) {
- StopFuture old = stopFuts.putIfAbsent(routineId, fut = new
StopFuture(ctx));
+ try {
+ StopFuture fut = stopFuts.get(routineId);
- if (old != null)
- fut = old;
- else
- doStop = true;
- }
+ // Only one thread will stop routine with provided ID.
+ if (fut == null) {
+ StopFuture old = stopFuts.putIfAbsent(routineId, fut = new
StopFuture(ctx));
- if (doStop) {
- // Unregister routine locally.
- LocalRoutineInfo routine = locInfos.remove(routineId);
+ if (old != null)
+ fut = old;
+ else
+ doStop = true;
+ }
- // Finish if routine is not found (wrong ID is provided).
- if (routine == null) {
- stopFuts.remove(routineId);
+ if (doStop) {
+ // Unregister routine locally.
+ LocalRoutineInfo routine = locInfos.remove(routineId);
- fut.onDone();
+ // Finish if routine is not found (wrong ID is provided).
+ if (routine == null) {
+ stopFuts.remove(routineId);
- return fut;
- }
+ fut.onDone();
- // Unregister handler locally.
- unregisterHandler(routineId, routine.hnd, true);
+ return fut;
+ }
- try {
- ctx.discovery().sendCustomEvent(new
StopRoutineDiscoveryMessage(routineId));
- }
- catch (IgniteCheckedException e) {
- fut.onDone(e);
+ // Unregister handler locally.
+ unregisterHandler(routineId, routine.hnd, true);
+
+ try {
+ ctx.discovery().sendCustomEvent(new
StopRoutineDiscoveryMessage(routineId));
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
+
+ if (ctx.isStopping())
+ fut.onDone();
}
- if (ctx.isStopping())
- fut.onDone();
+ return fut;
+ }
+ finally {
+ unlockStopping();
}
-
- return fut;
}
/**
@@ -883,7 +901,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws
IgniteCheckedException {
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
cancelFutures(new
IgniteClientDisconnectedCheckedException(reconnectFut, "Client node
disconnected."));
if (log.isDebugEnabled()) {
@@ -1951,9 +1969,6 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
* Future for stop routine.
*/
private static class StopFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Timeout object. */
private volatile GridTimeoutObject timeoutObj;
@@ -1997,9 +2012,6 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
*/
private static class SyncMessageAckFuture extends
GridFutureAdapter<Object> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private UUID nodeId;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5181f6b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 826c1a9..9f9e577 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -26,8 +26,10 @@ import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
@@ -539,21 +541,24 @@ public abstract class
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreSingleNodeFailure() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-5975");
-
final Ignite i1 = grid(0);
IgniteSemaphore sem1 = i1.semaphore(STRUCTURE_NAME, 1, false, true);
sem1.acquire();
+ final CountDownLatch createLatch = new CountDownLatch(1);
+
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new
Callable<Void>() {
- @Override public Void call() throws Exception {
+ @Override public Void call() {
boolean failed = true;
IgniteSemaphore sem2 = i1.semaphore(STRUCTURE_NAME, 1, false,
true);
try {
+ // Guard the acquire call by count down latch to make sure
that semaphore creation does not fail.
+ createLatch.countDown();
+
sem2.acquire();
}
catch (Exception ignored){
@@ -568,10 +573,14 @@ public abstract class
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
}
});
- while(!sem1.hasQueuedThreads()){
+ assertTrue("Failed to wait for semaphore creation",
+ createLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
+
+ while(!sem1.hasQueuedThreads()) {
try {
Thread.sleep(1);
- } catch (InterruptedException ignored) {
+ }
+ catch (InterruptedException ignored) {
fail();
}
}