Repository: ignite Updated Branches: refs/heads/master 78c2d3bbb -> 754c7337d
IGNITE-9675 Fixed deadlock on Ignite#active() and concurrent node stop - Fixes #4822. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/754c7337 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/754c7337 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/754c7337 Branch: refs/heads/master Commit: 754c7337de123ac44e2816d2a55ab6f76cd03eac Parents: 78c2d3b Author: Alexey Platonov <aplaton...@gmail.com> Authored: Fri Oct 5 11:57:07 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Oct 5 12:45:45 2018 +0300 ---------------------------------------------------------------------- .../cluster/GridClusterStateProcessor.java | 50 ++++++++++++-------- .../cluster/IGridClusterStateProcessor.java | 6 +++ .../processors/task/GridTaskProcessor.java | 25 +++++++++- .../processors/igfs/IgfsIgniteMock.java | 8 ++-- 4 files changed, 64 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index c4a3126..2b70998 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -59,8 +59,11 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadW import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; @@ -173,6 +176,11 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I /** {@inheritDoc} */ @Override public boolean publicApiActiveState(boolean waitForTransition) { + return publicApiActiveStateAsync(waitForTransition).get(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> publicApiActiveStateAsync(boolean asyncWaitForTransition) { if (ctx.isDaemon()) return sendComputeCheckGlobalState(); @@ -184,32 +192,34 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I Boolean transitionRes = globalState.transitionResult(); if (transitionRes != null) - return transitionRes; + return new IgniteFinishedFutureImpl<>(transitionRes); else { - if (waitForTransition) { - GridFutureAdapter<Void> fut = transitionFuts.get(globalState.transitionRequestId()); + GridFutureAdapter<Void> fut = transitionFuts.get(globalState.transitionRequestId()); + if (fut != null) { + if (asyncWaitForTransition) { + return new IgniteFutureImpl<>(fut.chain(new C1<IgniteInternalFuture<Void>, Boolean>() { + @Override public Boolean apply(IgniteInternalFuture<Void> fut) { + Boolean res = globalState.transitionResult(); - if (fut != null) { - try { - fut.get(); - } - catch (IgniteCheckedException ex) { - throw new IgniteException(ex); - } + assert res != null; + + return res; + } + })); } + else + return new IgniteFinishedFutureImpl<>(false); + } - transitionRes = globalState.transitionResult(); + transitionRes = globalState.transitionResult(); - assert transitionRes != null; + assert transitionRes != null; - return transitionRes; - } - else - return false; + return new IgniteFinishedFutureImpl<>(transitionRes); } } else - return globalState.active(); + return new IgniteFinishedFutureImpl<>(globalState.active()); } /** {@inheritDoc} */ @@ -1066,7 +1076,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I * * @return Cluster state, {@code True} if cluster active, {@code False} if inactive. */ - private boolean sendComputeCheckGlobalState() { + private IgniteFuture<Boolean> sendComputeCheckGlobalState() { AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); if (log.isInfoEnabled()) { @@ -1079,11 +1089,11 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I ClusterGroupAdapter clusterGroupAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers(); if (F.isEmpty(clusterGroupAdapter.nodes())) - return false; + return new IgniteFinishedFutureImpl<>(false); IgniteCompute comp = clusterGroupAdapter.compute(); - return comp.call(new IgniteCallable<Boolean>() { + return comp.callAsync(new IgniteCallable<Boolean>() { @IgniteInstanceResource private Ignite ig; http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java index bc72a51..d71b4cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.GridProcessor; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.StateChangeRequest; +import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** @@ -41,6 +42,11 @@ public interface IGridClusterStateProcessor extends GridProcessor { boolean publicApiActiveState(boolean waitForTransition); /** + * @return Cluster state to be used on public API. + */ + IgniteFuture<Boolean> publicApiActiveStateAsync(boolean waitForTransition); + + /** * @param discoCache Discovery data cache. * @return If transition is in progress returns future which is completed when transition finishes. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 9007472..313f6c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.GridTaskSessionRequest; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; @@ -69,6 +71,7 @@ import org.apache.ignite.internal.util.lang.GridPeerDeployAware; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; @@ -187,7 +190,24 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha /** {@inheritDoc} */ @SuppressWarnings("TooBroadScope") @Override public void onKernalStop(boolean cancel) { - lock.writeLock(); + boolean interrupted = false; + + while (true) { + try { + if (lock.tryWriteLock(1, TimeUnit.SECONDS)) + break; + else { + LT.warn(log, "Still waiting to acquire write lock on stop"); + + U.sleep(50); + } + } + catch (IgniteInterruptedCheckedException | InterruptedException e) { + LT.warn(log, "Stopping thread was interrupted while waiting for write lock (will wait anyway)"); + + interrupted = true; + } + } try { stopping = true; @@ -196,6 +216,9 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha } finally { lock.writeUnlock(); + + if (interrupted) + Thread.currentThread().interrupt(); } startLatch.countDown(); http://git-wip-us.apache.org/repos/asf/ignite/blob/754c7337/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java index a0ce285..5b25116 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java @@ -182,13 +182,13 @@ public class IgfsIgniteMock implements IgniteEx { return null; } - @Override - public boolean isRebalanceEnabled() { + /** {@inheritDoc} */ + @Override public boolean isRebalanceEnabled() { return true; } - @Override - public void rebalanceEnabled(boolean rebalanceEnabled) { + /** {@inheritDoc} */ + @Override public void rebalanceEnabled(boolean rebalanceEnabled) { throwUnsupported(); }