Repository: ignite Updated Branches: refs/heads/master b18c28e4b -> f59d29b95
http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java index 16676c8..55740a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java @@ -18,18 +18,72 @@ package org.apache.ignite.internal.worker; import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.failure.FailureType; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.GridWorkerListener; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_BLOCKED; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; /** - * Workers registry. + * Workers registry. Maintains a set of workers currently running. + * Can perform periodic liveness checks for these workers on behalf of any of them. */ public class WorkersRegistry implements GridWorkerListener { + /** */ + private static final long DFLT_CHECK_INTERVAL = 3_000; + /** Registered workers. */ private final ConcurrentMap<String, GridWorker> registeredWorkers = new ConcurrentHashMap<>(); + /** Whether workers' liveness checking enabled or not. */ + private volatile boolean livenessCheckEnabled = true; + + /** Points to the next worker to check. */ + private volatile Iterator<Map.Entry<String, GridWorker>> checkIter = registeredWorkers.entrySet().iterator(); + + /** It's safe to omit 'volatile' due to memory effects of lastChecker. */ + private long lastCheckTs = U.currentTimeMillis(); + + /** Last thread that performed the check. Null reference denotes "checking is in progress". */ + private final AtomicReference<Thread> lastChecker = new AtomicReference<>(Thread.currentThread()); + + /** */ + private final IgniteBiInClosure<GridWorker, FailureType> workerFailedHnd; + + /** Worker heartbeat timeout in milliseconds, when exceeded, worker is considered as blocked. */ + private final long heartbeatTimeout; + + /** Time in milliseconds between successive workers checks. */ + private final long checkInterval; + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param workerFailedHnd Closure to invoke on worker failure. + * @param heartbeatTimeout Maximum allowed worker heartbeat interval in milliseconds, should be positive. + */ + public WorkersRegistry( + @NotNull IgniteBiInClosure<GridWorker, FailureType> workerFailedHnd, + long heartbeatTimeout, + IgniteLogger log) { + this.workerFailedHnd = workerFailedHnd; + this.heartbeatTimeout = heartbeatTimeout; + this.checkInterval = Math.min(DFLT_CHECK_INTERVAL, heartbeatTimeout); + this.log = log; + } + /** * Adds worker to the registry. * @@ -38,6 +92,8 @@ public class WorkersRegistry implements GridWorkerListener { public void register(GridWorker w) { if (registeredWorkers.putIfAbsent(w.runner().getName(), w) != null) throw new IllegalStateException("Worker is already registered [worker=" + w + ']'); + + checkIter = registeredWorkers.entrySet().iterator(); } /** @@ -47,6 +103,8 @@ public class WorkersRegistry implements GridWorkerListener { */ public void unregister(String name) { registeredWorkers.remove(name); + + checkIter = registeredWorkers.entrySet().iterator(); } /** @@ -68,6 +126,16 @@ public class WorkersRegistry implements GridWorkerListener { return registeredWorkers.get(name); } + /** */ + boolean livenessCheckEnabled() { + return livenessCheckEnabled; + } + + /** */ + void livenessCheckEnabled(boolean val) { + livenessCheckEnabled = val; + } + /** {@inheritDoc} */ @Override public void onStarted(GridWorker w) { register(w); @@ -77,4 +145,83 @@ public class WorkersRegistry implements GridWorkerListener { @Override public void onStopped(GridWorker w) { unregister(w.runner().getName()); } + + /** {@inheritDoc} */ + @Override public void onIdle(GridWorker w) { + if (!livenessCheckEnabled) + return; + + Thread prevCheckerThread = lastChecker.get(); + + if (prevCheckerThread == null || + U.currentTimeMillis() - lastCheckTs <= checkInterval || + !lastChecker.compareAndSet(prevCheckerThread, null)) + return; + + try { + lastCheckTs = U.currentTimeMillis(); + + long workersToCheck = Math.max(registeredWorkers.size() * checkInterval / heartbeatTimeout, 1); + + int workersChecked = 0; + + while (workersChecked < workersToCheck) { + if (!checkIter.hasNext()) + checkIter = registeredWorkers.entrySet().iterator(); + + GridWorker worker; + + try { + worker = checkIter.next().getValue(); + } + catch (NoSuchElementException e) { + return; + } + + Thread runner = worker.runner(); + + if (runner != null && runner != Thread.currentThread() && !worker.isCancelled()) { + if (!runner.isAlive()) { + // In normal operation GridWorker implementation guarantees: + // worker termination happens before its removal from registeredWorkers. + // That is, if worker is dead, but still resides in registeredWorkers + // then something went wrong, the only extra thing is to test + // whether the iterator refers to actual state of registeredWorkers. + GridWorker worker0 = registeredWorkers.get(worker.runner().getName()); + + if (worker0 != null && worker0 == worker) + workerFailedHnd.apply(worker, SYSTEM_WORKER_TERMINATION); + } + + long heartbeatDelay = U.currentTimeMillis() - worker.heartbeatTs(); + + if (heartbeatDelay > heartbeatTimeout) { + GridWorker worker0 = registeredWorkers.get(worker.runner().getName()); + + if (worker0 != null && worker0 == worker) { + log.error("Blocked system-critical thread has been detected. " + + "This can lead to cluster-wide undefined behaviour " + + "[threadName=" + worker.name() + ", blockedFor=" + heartbeatDelay / 1000 + "s]"); + + U.dumpThread(worker.runner(), log); + + workerFailedHnd.apply(worker, SYSTEM_WORKER_BLOCKED); + } + + // Iterator should not be reset: + // otherwise we'll never iterate beyond the blocked worker, + // that may stay in the map for indefinite time. + } + } + + if (runner != Thread.currentThread()) + workersChecked++; + } + } + finally { + boolean set = lastChecker.compareAndSet(null, Thread.currentThread()); + + assert set; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java index b999ab7..18b0084 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java @@ -47,6 +47,13 @@ public interface WorkersControlMXBean { ) public boolean terminateWorker(String name); + /** */ + @MXBeanDescription("Whether workers check each other's health.") + public boolean getHealthMonitoringEnabled(); + + /** */ + public void setHealthMonitoringEnabled(boolean val); + /** * Stops thread by {@code name}, if exists and unique. * http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4ab1dd4..877d2be 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -106,11 +106,11 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; @@ -4126,6 +4126,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * @param ignite Ignite. + */ + private static WorkersRegistry getWorkersRegistry(Ignite ignite) { + return ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null; + } + + /** * */ private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener { @@ -4260,11 +4267,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param log Logger. */ private CommunicationWorker(String igniteInstanceName, IgniteLogger log) { - super(igniteInstanceName, "tcp-comm-worker", log, - ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null); + super(igniteInstanceName, "tcp-comm-worker", log, getWorkersRegistry(ignite)); } - /** {@inheritDoc} */ + /** */ @Override protected void body() throws InterruptedException { if (log.isDebugEnabled()) log.debug("Tcp communication worker has been started."); @@ -4273,12 +4279,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati try { while (!isCancelled()) { - DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + DisconnectedSessionInfo disconnectData; + + blockingSectionBegin(); + + try { + disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + } + finally { + blockingSectionEnd(); + } if (disconnectData != null) processDisconnect(disconnectData); else processIdle(); + + onIdle(); } } catch (Throwable t) { @@ -4295,7 +4312,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (err instanceof OutOfMemoryError) ((IgniteEx)ignite).context().failure().process(new FailureContext(CRITICAL_ERROR, err)); else if (err != null) - ((IgniteEx)ignite).context().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + ((IgniteEx)ignite).context().failure().process( + new FailureContext(SYSTEM_WORKER_TERMINATION, err)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 673290e..faaaff7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -196,7 +196,7 @@ class ClientImpl extends TcpDiscoveryImpl { private final Timer timer = new Timer("TcpDiscoverySpi.timer"); /** */ - protected MessageWorker msgWorker; + private MessageWorker msgWorker; /** Force fail message for local node. */ private TcpDiscoveryNodeFailedMessage forceFailMsg; @@ -539,14 +539,20 @@ class ClientImpl extends TcpDiscoveryImpl { * @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to * and {@code null} otherwise. * @param timeout Timeout. + * @param beforeEachSleep code to be run before each sleep span. + * @param afterEachSleep code to be run before each sleep span. * @return Opened socket or {@code null} if timeout. * @throws InterruptedException If interrupted. * @throws IgniteSpiException If failed. * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") - @Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress prevAddr, long timeout) - throws IgniteSpiException, InterruptedException { + @Nullable private T2<SocketStream, Boolean> joinTopology( + InetSocketAddress prevAddr, + long timeout, + @Nullable Runnable beforeEachSleep, + @Nullable Runnable afterEachSleep + ) throws IgniteSpiException, InterruptedException { List<InetSocketAddress> addrs = null; long startTime = U.currentTimeMillis(); @@ -573,7 +579,7 @@ class ClientImpl extends TcpDiscoveryImpl { "Will retry every " + spi.getReconnectDelay() + " ms. " + "Change 'reconnectDelay' to configure the frequency of retries.", true); - Thread.sleep(spi.getReconnectDelay()); + sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep); } } @@ -639,18 +645,32 @@ class ClientImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Will wait before retry join."); - Thread.sleep(spi.getReconnectDelay()); + sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep); } else if (addrs.isEmpty()) { LT.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + "every " + spi.getReconnectDelay() + " ms; change 'reconnectDelay' to configure the frequency " + "of retries): " + toOrderedList(addrs0), true); - Thread.sleep(spi.getReconnectDelay()); + sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep); } } } + /** */ + private static void sleepEx(long millis, Runnable before, Runnable after) throws InterruptedException { + if (before != null) + before.run(); + + try { + Thread.sleep(millis); + } + finally { + if (after != null) + after.run(); + } + } + /** * @param recon {@code True} if reconnects. * @param addr Address. @@ -1486,7 +1506,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { while (true) { - T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout); + T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout, null, null); if (joinRes == null) { if (join) { @@ -1642,13 +1662,26 @@ class ClientImpl extends TcpDiscoveryImpl { @Override protected void body() throws InterruptedException { state = STARTING; + updateHeartbeat(); + spi.stats.onJoinStarted(); try { tryJoin(); while (true) { - Object msg = queue.take(); + onIdle(); + + Object msg; + + blockingSectionBegin(); + + try { + msg = queue.take(); + } + finally { + blockingSectionEnd(); + } if (msg == JOIN_TIMEOUT) { if (state == STARTING) { @@ -1969,7 +2002,17 @@ class ClientImpl extends TcpDiscoveryImpl { T2<SocketStream, Boolean> joinRes; try { - joinRes = joinTopology(null, spi.joinTimeout); + joinRes = joinTopology(null, spi.joinTimeout, + new Runnable() { + @Override public void run() { + blockingSectionBegin(); + } + }, + new Runnable() { + @Override public void run() { + blockingSectionEnd(); + } + }); } catch (IgniteSpiException e) { joinError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f82af61..778e8d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -97,6 +97,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.GridWorkerListener; +import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteProductVersion; @@ -2153,6 +2154,11 @@ class ServerImpl extends TcpDiscoveryImpl { } } + /** */ + private static WorkersRegistry getWorkerRegistry(TcpDiscoverySpi spi) { + return spi.ignite() instanceof IgniteEx ? ((IgniteEx)spi.ignite()).context().workersRegistry() : null; + } + /** * Discovery messages history used for client reconnect. */ @@ -2647,10 +2653,15 @@ class ServerImpl extends TcpDiscoveryImpl { * @param log Logger. */ private RingMessageWorker(IgniteLogger log) { - super("tcp-disco-msg-worker", log, 10, - spi.ignite() instanceof IgniteEx ? ((IgniteEx)spi.ignite()).context().workersRegistry() : null); + super("tcp-disco-msg-worker", log, 10, getWorkerRegistry(spi)); initConnectionCheckThreshold(); + + setBeforeEachPollAction(() -> { + updateHeartbeat(); + + onIdle(); + }); } /** @@ -2681,8 +2692,8 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Message has been added to queue: " + msg); } - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { + /** */ + protected void body() throws InterruptedException { Throwable err = null; try { @@ -5784,8 +5795,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @throws IgniteSpiException In case of error. */ TcpServer(IgniteLogger log) throws IgniteSpiException { - super(spi.ignite().name(), "tcp-disco-srvr", log, - spi.ignite() instanceof IgniteEx ? ((IgniteEx)spi.ignite()).context().workersRegistry() : null); + super(spi.ignite().name(), "tcp-disco-srvr", log, getWorkerRegistry(spi)); int lastPort = spi.locPortRange == 0 ? spi.locPort : spi.locPort + spi.locPortRange - 1; @@ -5827,13 +5837,22 @@ class ServerImpl extends TcpDiscoveryImpl { ", addr=" + spi.locHost + ']'); } - /** {@inheritDoc} */ + /** */ @Override protected void body() { Throwable err = null; try { while (!isCancelled()) { - Socket sock = srvrSock.accept(); + Socket sock; + + blockingSectionBegin(); + + try { + sock = srvrSock.accept(); + } + finally { + blockingSectionEnd(); + } long tstamp = U.currentTimeMillis(); @@ -5854,6 +5873,8 @@ class ServerImpl extends TcpDiscoveryImpl { reader.start(); spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp); + + onIdle(); } } catch (IOException e) { @@ -7055,15 +7076,11 @@ class ServerImpl extends TcpDiscoveryImpl { } /** */ - private class MessageWorkerThreadWithCleanup<T> extends MessageWorkerThread { - /** */ - private final MessageWorker worker; + private class MessageWorkerThreadWithCleanup<T> extends MessageWorkerThread<MessageWorker<T>> { /** {@inheritDoc} */ private MessageWorkerThreadWithCleanup(MessageWorker<T> worker, IgniteLogger log) { super(worker, log); - - this.worker = worker; } /** {@inheritDoc} */ @@ -7085,17 +7102,17 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Slightly modified {@link IgniteSpiThread} intended to use with message workers. */ - private class MessageWorkerThread extends IgniteSpiThread { + private class MessageWorkerThread<W extends GridWorker> extends IgniteSpiThread { /** * Backed interrupted flag, once set, it is not affected by further {@link Thread#interrupted()} calls. */ private volatile boolean interrupted; /** */ - private final GridWorker worker; + protected final W worker; /** {@inheritDoc} */ - private MessageWorkerThread(GridWorker worker, IgniteLogger log) { + private MessageWorkerThread(W worker, IgniteLogger log) { super(worker.igniteInstanceName(), worker.name(), log); this.worker = worker; @@ -7133,6 +7150,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Polling timeout. */ private final long pollingTimeout; + /** */ + private Runnable beforeEachPoll; + /** * @param name Worker name. * @param log Logger. @@ -7150,12 +7170,22 @@ class ServerImpl extends TcpDiscoveryImpl { this.pollingTimeout = pollingTimeout; } + /** + * @param act action to be executed before each timed queue poll. + */ + void setBeforeEachPollAction(Runnable act) { + beforeEachPoll = act; + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { if (log.isDebugEnabled()) log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isCancelled()) { + if (beforeEachPoll != null) + beforeEachPoll.run(); + T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS); if (msg == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java index dc5f1f5..0484598 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java @@ -41,7 +41,7 @@ public class AbstractFailureHandlerTest extends GridCommonAbstractTest { /** * */ - protected static class DummyFailureHandler implements FailureHandler { + protected static class DummyFailureHandler extends AbstractFailureHandler { /** Failure. */ private volatile boolean failure; @@ -49,7 +49,7 @@ public class AbstractFailureHandlerTest extends GridCommonAbstractTest { private volatile FailureContext ctx; /** {@inheritDoc} */ - @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { failure = true; ctx = failureCtx; http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java new file mode 100644 index 0000000..3ca7948 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.failure; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.thread.IgniteThread; + +/** + * Tests the handling of long blocking operations in system-critical workers. + */ +public class SystemWorkersBlockingTest extends GridCommonAbstractTest { + /** Handler latch. */ + private static volatile CountDownLatch hndLatch; + + /** */ + private static final long FAILURE_DETECTION_TIMEOUT = 5_000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setFailureHandler(new AbstractFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + hndLatch.countDown(); + + return false; + } + }); + + cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + hndLatch = new CountDownLatch(1); + + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testBlockingWorker() throws Exception { + IgniteEx ignite = grid(0); + + GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { + @Override protected void body() throws InterruptedException { + Thread.sleep(Long.MAX_VALUE); + } + }; + + new IgniteThread(worker).start(); + + while (worker.runner() == null) + Thread.sleep(10); + + ignite.context().workersRegistry().register(worker); + + assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 2, TimeUnit.MILLISECONDS)); + + Thread runner = worker.runner(); + + runner.interrupt(); + runner.join(1000); + + assertFalse(runner.isAlive()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java index 0df870d..638e6f1 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java @@ -25,11 +25,13 @@ import org.apache.ignite.Ignite; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.thread.IgniteThread; /** * Tests system critical workers termination. @@ -38,6 +40,9 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { /** Handler latch. */ private static volatile CountDownLatch hndLatch; + /** */ + private static final long FAILURE_DETECTION_TIMEOUT = 5_000; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -53,6 +58,8 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { cfg.setDataStorageConfiguration(dsCfg); + cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT); + return cfg; } @@ -112,6 +119,32 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testSyntheticWorkerTermination() throws Exception { + hndLatch = new CountDownLatch(1); + + IgniteEx ignite = grid(0); + + GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { + @Override protected void body() throws InterruptedException { + Thread.sleep(ignite.configuration().getFailureDetectionTimeout() / 2); + } + }; + + new IgniteThread(worker).start(); + + while (worker.runner() == null) + Thread.sleep(10); + + ignite.context().workersRegistry().register(worker); + + worker.runner().join(); + + assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 2, TimeUnit.MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ private void deleteWorkFiles() throws Exception { cleanPersistenceDir(); @@ -121,9 +154,9 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest { /** * Test failure handler. */ - private class TestFailureHandler implements FailureHandler { + private class TestFailureHandler extends AbstractFailureHandler { /** {@inheritDoc} */ - @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { hndLatch.countDown(); return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java index 545c9ea..09dce9b 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java @@ -24,7 +24,7 @@ import org.apache.ignite.Ignite; /** * Test failure handler implementation */ -public class TestFailureHandler implements FailureHandler { +public class TestFailureHandler extends AbstractFailureHandler { /** Invalidate. */ private final boolean invalidate; @@ -51,7 +51,7 @@ public class TestFailureHandler implements FailureHandler { } /** {@inheritDoc} */ - @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { this.failureCtx = failureCtx; if (latch != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java index 9a98a88..8744465 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java @@ -36,6 +36,8 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; +import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.TestFailureHandler; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -215,11 +217,13 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest { String nodeName = "client" + idx; IgniteConfiguration cfg = getConfiguration(nodeName) - .setFailureHandler((ignite, failureCtx) -> { - // This should _not_ fire when exchange-worker terminates before reconnect. - Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE); + .setFailureHandler(new AbstractFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + // This should _not_ fire when exchange-worker terminates before reconnect. + Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE); - return false; + return false; + } }); return startGrid(nodeName, optimize(cfg), null); http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java index 40025f6..059b5ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java @@ -32,8 +32,8 @@ import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -418,7 +418,7 @@ public class IgnitePdsCorruptedStoreTest extends GridCommonAbstractTest { /** * Dummy failure handler */ - public static class DummyFailureHandler implements FailureHandler { + public static class DummyFailureHandler extends AbstractFailureHandler { /** Failure. */ private volatile boolean failure = false; @@ -440,7 +440,7 @@ public class IgnitePdsCorruptedStoreTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { failure = true; error = failureCtx.error(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java index daa2aeb..353bc50 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java @@ -58,6 +58,8 @@ public class IgnitePdsPageSizesTest extends GridCommonAbstractTest { .setAffinity(new RendezvousAffinityFunction(false, 32)) ); + cfg.setFailureDetectionTimeout(20_000); + return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java index 7318c25..b36bac0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java @@ -25,20 +25,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -80,8 +77,8 @@ public class IgnitePdsTaskCancelingTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - cfg.setFailureHandler(new FailureHandler() { - @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + cfg.setFailureHandler(new AbstractFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { failure.set(true); return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java index e84563a..dd10479 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java @@ -80,6 +80,8 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(ccfg1); + cfg.setFailureDetectionTimeout(20_000); + return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java index 587be71..8ddfd44 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java @@ -28,8 +28,8 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoManager; @@ -838,9 +838,9 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { /** * */ - private static class TestFailureHandler implements FailureHandler { + private static class TestFailureHandler extends AbstractFailureHandler { /** {@inheritDoc} */ - @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { failureLatch.countDown(); return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java index 7d4d802..4c66d24 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java @@ -22,8 +22,8 @@ import java.util.Collection; import org.apache.ignite.Ignite; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -83,9 +83,9 @@ public class TcpDiscoverySegmentationPolicyTest extends GridCommonAbstractTest { /** * Test failure handler. */ - private static class TestFailureHandler implements FailureHandler { + private static class TestFailureHandler extends AbstractFailureHandler { /** {@inheritDoc} */ - @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { dfltFailureHndInvoked = true; return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index f15a72e..a73367a 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -994,7 +994,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA */ public void testSetTimes() throws Exception { Path fsHome = new Path(primaryFsUri); - final Path file = new Path(fsHome, "/heartbeat"); + final Path file = new Path(fsHome, "/heartbeatTs"); fs.create(file).close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java index 0b3ded4..fcbad9b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java @@ -126,6 +126,9 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { /** Log only. */ private boolean logOnly; + /** */ + private long customFailureDetectionTimeout = -1; + /** {@inheritDoc} */ @Override protected boolean isMultiJvm() { return fork; @@ -184,6 +187,9 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { if (!getTestIgniteInstanceName(0).equals(gridName)) cfg.setUserAttributes(F.asMap(HAS_CACHE, true)); + if (customFailureDetectionTimeout > 0) + cfg.setFailureDetectionTimeout(customFailureDetectionTimeout); + return cfg; } @@ -475,7 +481,11 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testHugeCheckpointRecord() throws Exception { + long prevFDTimeout = customFailureDetectionTimeout; + try { + customFailureDetectionTimeout = 40_000; + final IgniteEx ignite = startGrid(1); ignite.cluster().active(true); @@ -516,6 +526,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { fut.get(); } finally { + customFailureDetectionTimeout = prevFDTimeout; + stopAllGrids(); } }