Repository: ignite
Updated Branches:
  refs/heads/master b18c28e4b -> f59d29b95


http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
index 16676c8..55740a4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
@@ -18,18 +18,72 @@
 package org.apache.ignite.internal.worker;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerListener;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_BLOCKED;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 
 /**
- * Workers registry.
+ * Workers registry. Maintains a set of workers currently running.
+ * Can perform periodic liveness checks for these workers on behalf of any of 
them.
  */
 public class WorkersRegistry implements GridWorkerListener {
+    /** */
+    private static final long DFLT_CHECK_INTERVAL = 3_000;
+
     /** Registered workers. */
     private final ConcurrentMap<String, GridWorker> registeredWorkers = new 
ConcurrentHashMap<>();
 
+    /** Whether workers' liveness checking enabled or not. */
+    private volatile boolean livenessCheckEnabled = true;
+
+    /** Points to the next worker to check. */
+    private volatile Iterator<Map.Entry<String, GridWorker>> checkIter = 
registeredWorkers.entrySet().iterator();
+
+    /** It's safe to omit 'volatile' due to memory effects of lastChecker. */
+    private long lastCheckTs = U.currentTimeMillis();
+
+    /** Last thread that performed the check. Null reference denotes "checking 
is in progress". */
+    private final AtomicReference<Thread> lastChecker = new 
AtomicReference<>(Thread.currentThread());
+
+    /** */
+    private final IgniteBiInClosure<GridWorker, FailureType> workerFailedHnd;
+
+    /** Worker heartbeat timeout in milliseconds, when exceeded, worker is 
considered as blocked. */
+    private final long heartbeatTimeout;
+
+    /** Time in milliseconds between successive workers checks. */
+    private final long checkInterval;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * @param workerFailedHnd Closure to invoke on worker failure.
+     * @param heartbeatTimeout Maximum allowed worker heartbeat interval in 
milliseconds, should be positive.
+     */
+    public WorkersRegistry(
+        @NotNull IgniteBiInClosure<GridWorker, FailureType> workerFailedHnd,
+        long heartbeatTimeout,
+        IgniteLogger log) {
+        this.workerFailedHnd = workerFailedHnd;
+        this.heartbeatTimeout = heartbeatTimeout;
+        this.checkInterval = Math.min(DFLT_CHECK_INTERVAL, heartbeatTimeout);
+        this.log = log;
+    }
+
     /**
      * Adds worker to the registry.
      *
@@ -38,6 +92,8 @@ public class WorkersRegistry implements GridWorkerListener {
     public void register(GridWorker w) {
         if (registeredWorkers.putIfAbsent(w.runner().getName(), w) != null)
             throw new IllegalStateException("Worker is already registered 
[worker=" + w + ']');
+
+        checkIter = registeredWorkers.entrySet().iterator();
     }
 
     /**
@@ -47,6 +103,8 @@ public class WorkersRegistry implements GridWorkerListener {
      */
     public void unregister(String name) {
         registeredWorkers.remove(name);
+
+        checkIter = registeredWorkers.entrySet().iterator();
     }
 
     /**
@@ -68,6 +126,16 @@ public class WorkersRegistry implements GridWorkerListener {
         return registeredWorkers.get(name);
     }
 
+    /** */
+    boolean livenessCheckEnabled() {
+        return livenessCheckEnabled;
+    }
+
+    /** */
+    void livenessCheckEnabled(boolean val) {
+        livenessCheckEnabled = val;
+    }
+
     /** {@inheritDoc} */
     @Override public void onStarted(GridWorker w) {
         register(w);
@@ -77,4 +145,83 @@ public class WorkersRegistry implements GridWorkerListener {
     @Override public void onStopped(GridWorker w) {
         unregister(w.runner().getName());
     }
+
+    /** {@inheritDoc} */
+    @Override public void onIdle(GridWorker w) {
+        if (!livenessCheckEnabled)
+            return;
+
+        Thread prevCheckerThread = lastChecker.get();
+
+        if (prevCheckerThread == null ||
+            U.currentTimeMillis() - lastCheckTs <= checkInterval ||
+            !lastChecker.compareAndSet(prevCheckerThread, null))
+            return;
+
+        try {
+            lastCheckTs = U.currentTimeMillis();
+
+            long workersToCheck = Math.max(registeredWorkers.size() * 
checkInterval / heartbeatTimeout, 1);
+
+            int workersChecked = 0;
+
+            while (workersChecked < workersToCheck) {
+                if (!checkIter.hasNext())
+                    checkIter = registeredWorkers.entrySet().iterator();
+
+                GridWorker worker;
+
+                try {
+                    worker = checkIter.next().getValue();
+                }
+                catch (NoSuchElementException e) {
+                    return;
+                }
+
+                Thread runner = worker.runner();
+
+                if (runner != null && runner != Thread.currentThread() && 
!worker.isCancelled()) {
+                    if (!runner.isAlive()) {
+                        // In normal operation GridWorker implementation 
guarantees:
+                        // worker termination happens before its removal from 
registeredWorkers.
+                        // That is, if worker is dead, but still resides in 
registeredWorkers
+                        // then something went wrong, the only extra thing is 
to test
+                        // whether the iterator refers to actual state of 
registeredWorkers.
+                        GridWorker worker0 = 
registeredWorkers.get(worker.runner().getName());
+
+                        if (worker0 != null && worker0 == worker)
+                            workerFailedHnd.apply(worker, 
SYSTEM_WORKER_TERMINATION);
+                    }
+
+                    long heartbeatDelay = U.currentTimeMillis() - 
worker.heartbeatTs();
+
+                    if (heartbeatDelay > heartbeatTimeout) {
+                        GridWorker worker0 = 
registeredWorkers.get(worker.runner().getName());
+
+                        if (worker0 != null && worker0 == worker) {
+                            log.error("Blocked system-critical thread has been 
detected. " +
+                                "This can lead to cluster-wide undefined 
behaviour " +
+                                "[threadName=" + worker.name() + ", 
blockedFor=" + heartbeatDelay / 1000 + "s]");
+
+                            U.dumpThread(worker.runner(), log);
+
+                            workerFailedHnd.apply(worker, 
SYSTEM_WORKER_BLOCKED);
+                        }
+
+                        // Iterator should not be reset:
+                        // otherwise we'll never iterate beyond the blocked 
worker,
+                        // that may stay in the map for indefinite time.
+                    }
+                }
+
+                if (runner != Thread.currentThread())
+                    workersChecked++;
+            }
+        }
+        finally {
+            boolean set = lastChecker.compareAndSet(null, 
Thread.currentThread());
+
+            assert set;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java 
b/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java
index b999ab7..18b0084 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/mxbean/WorkersControlMXBean.java
@@ -47,6 +47,13 @@ public interface WorkersControlMXBean {
     )
     public boolean terminateWorker(String name);
 
+    /** */
+    @MXBeanDescription("Whether workers check each other's health.")
+    public boolean getHealthMonitoringEnabled();
+
+    /** */
+    public void setHealthMonitoringEnabled(boolean val);
+
     /**
      * Stops thread by {@code name}, if exists and unique.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ab1dd4..877d2be 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -106,11 +106,11 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
@@ -4126,6 +4126,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param ignite Ignite.
+     */
+    private static WorkersRegistry getWorkersRegistry(Ignite ignite) {
+        return ignite instanceof IgniteEx ? 
((IgniteEx)ignite).context().workersRegistry() : null;
+    }
+
+    /**
      *
      */
     private class DiscoveryListener implements GridLocalEventListener, 
HighPriorityListener {
@@ -4260,11 +4267,10 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
          * @param log Logger.
          */
         private CommunicationWorker(String igniteInstanceName, IgniteLogger 
log) {
-            super(igniteInstanceName, "tcp-comm-worker", log,
-                ignite instanceof IgniteEx ? 
((IgniteEx)ignite).context().workersRegistry() : null);
+            super(igniteInstanceName, "tcp-comm-worker", log, 
getWorkersRegistry(ignite));
         }
 
-        /** {@inheritDoc} */
+        /** */
         @Override protected void body() throws InterruptedException {
             if (log.isDebugEnabled())
                 log.debug("Tcp communication worker has been started.");
@@ -4273,12 +4279,23 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
 
             try {
                 while (!isCancelled()) {
-                    DisconnectedSessionInfo disconnectData = 
q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+                    DisconnectedSessionInfo disconnectData;
+
+                    blockingSectionBegin();
+
+                    try {
+                        disconnectData = q.poll(idleConnTimeout, 
TimeUnit.MILLISECONDS);
+                    }
+                    finally {
+                        blockingSectionEnd();
+                    }
 
                     if (disconnectData != null)
                         processDisconnect(disconnectData);
                     else
                         processIdle();
+
+                    onIdle();
                 }
             }
             catch (Throwable t) {
@@ -4295,7 +4312,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     if (err instanceof OutOfMemoryError)
                         ((IgniteEx)ignite).context().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
                     else if (err != null)
-                        ((IgniteEx)ignite).context().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+                        ((IgniteEx)ignite).context().failure().process(
+                            new FailureContext(SYSTEM_WORKER_TERMINATION, 
err));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 673290e..faaaff7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -196,7 +196,7 @@ class ClientImpl extends TcpDiscoveryImpl {
     private final Timer timer = new Timer("TcpDiscoverySpi.timer");
 
     /** */
-    protected MessageWorker msgWorker;
+    private MessageWorker msgWorker;
 
     /** Force fail message for local node. */
     private TcpDiscoveryNodeFailedMessage forceFailMsg;
@@ -539,14 +539,20 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @param prevAddr If reconnect is in progress, then previous address of 
the router the client was connected to
      *      and {@code null} otherwise.
      * @param timeout Timeout.
+     * @param beforeEachSleep code to be run before each sleep span.
+     * @param afterEachSleep code to be run before each sleep span.
      * @return Opened socket or {@code null} if timeout.
      * @throws InterruptedException If interrupted.
      * @throws IgniteSpiException If failed.
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress 
prevAddr, long timeout)
-        throws IgniteSpiException, InterruptedException {
+    @Nullable private T2<SocketStream, Boolean> joinTopology(
+        InetSocketAddress prevAddr,
+        long timeout,
+        @Nullable Runnable beforeEachSleep,
+        @Nullable Runnable afterEachSleep
+    ) throws IgniteSpiException, InterruptedException {
         List<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
@@ -573,7 +579,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                             "Will retry every " + spi.getReconnectDelay() + " 
ms. " +
                             "Change 'reconnectDelay' to configure the 
frequency of retries.", true);
 
-                    Thread.sleep(spi.getReconnectDelay());
+                    sleepEx(spi.getReconnectDelay(), beforeEachSleep, 
afterEachSleep);
                 }
             }
 
@@ -639,18 +645,32 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (log.isDebugEnabled())
                     log.debug("Will wait before retry join.");
 
-                Thread.sleep(spi.getReconnectDelay());
+                sleepEx(spi.getReconnectDelay(), beforeEachSleep, 
afterEachSleep);
             }
             else if (addrs.isEmpty()) {
                 LT.warn(log, "Failed to connect to any address from IP finder 
(will retry to join topology " +
                     "every " + spi.getReconnectDelay() + " ms; change 
'reconnectDelay' to configure the frequency " +
                     "of retries): " + toOrderedList(addrs0), true);
 
-                Thread.sleep(spi.getReconnectDelay());
+                sleepEx(spi.getReconnectDelay(), beforeEachSleep, 
afterEachSleep);
             }
         }
     }
 
+    /** */
+    private static void sleepEx(long millis, Runnable before, Runnable after) 
throws InterruptedException {
+        if (before != null)
+            before.run();
+
+        try {
+            Thread.sleep(millis);
+        }
+        finally {
+            if (after != null)
+                after.run();
+        }
+    }
+
     /**
      * @param recon {@code True} if reconnects.
      * @param addr Address.
@@ -1486,7 +1506,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             try {
                 while (true) {
-                    T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, 
timeout);
+                    T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, 
timeout, null, null);
 
                     if (joinRes == null) {
                         if (join) {
@@ -1642,13 +1662,26 @@ class ClientImpl extends TcpDiscoveryImpl {
         @Override protected void body() throws InterruptedException {
             state = STARTING;
 
+            updateHeartbeat();
+
             spi.stats.onJoinStarted();
 
             try {
                 tryJoin();
 
                 while (true) {
-                    Object msg = queue.take();
+                    onIdle();
+
+                    Object msg;
+
+                    blockingSectionBegin();
+
+                    try {
+                        msg = queue.take();
+                    }
+                    finally {
+                        blockingSectionEnd();
+                    }
 
                     if (msg == JOIN_TIMEOUT) {
                         if (state == STARTING) {
@@ -1969,7 +2002,17 @@ class ClientImpl extends TcpDiscoveryImpl {
             T2<SocketStream, Boolean> joinRes;
 
             try {
-                joinRes = joinTopology(null, spi.joinTimeout);
+                joinRes = joinTopology(null, spi.joinTimeout,
+                    new Runnable() {
+                        @Override public void run() {
+                            blockingSectionBegin();
+                        }
+                    },
+                    new Runnable() {
+                        @Override public void run() {
+                            blockingSectionEnd();
+                        }
+                    });
             }
             catch (IgniteSpiException e) {
                 joinError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f82af61..778e8d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerListener;
+import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
@@ -2153,6 +2154,11 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    /** */
+    private static WorkersRegistry getWorkerRegistry(TcpDiscoverySpi spi) {
+        return spi.ignite() instanceof IgniteEx ? 
((IgniteEx)spi.ignite()).context().workersRegistry() : null;
+    }
+
     /**
      * Discovery messages history used for client reconnect.
      */
@@ -2647,10 +2653,15 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param log Logger.
          */
         private RingMessageWorker(IgniteLogger log) {
-            super("tcp-disco-msg-worker", log, 10,
-                spi.ignite() instanceof IgniteEx ? 
((IgniteEx)spi.ignite()).context().workersRegistry() : null);
+            super("tcp-disco-msg-worker", log, 10, getWorkerRegistry(spi));
 
             initConnectionCheckThreshold();
+
+            setBeforeEachPollAction(() -> {
+                updateHeartbeat();
+
+                onIdle();
+            });
         }
 
         /**
@@ -2681,8 +2692,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Message has been added to queue: " + msg);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
+        /** */
+        protected void body() throws InterruptedException {
             Throwable err = null;
 
             try {
@@ -5784,8 +5795,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @throws IgniteSpiException In case of error.
          */
         TcpServer(IgniteLogger log) throws IgniteSpiException {
-            super(spi.ignite().name(), "tcp-disco-srvr", log,
-                spi.ignite() instanceof IgniteEx ? 
((IgniteEx)spi.ignite()).context().workersRegistry() : null);
+            super(spi.ignite().name(), "tcp-disco-srvr", log, 
getWorkerRegistry(spi));
 
             int lastPort = spi.locPortRange == 0 ? spi.locPort : spi.locPort + 
spi.locPortRange - 1;
 
@@ -5827,13 +5837,22 @@ class ServerImpl extends TcpDiscoveryImpl {
                 ", addr=" + spi.locHost + ']');
         }
 
-        /** {@inheritDoc} */
+        /** */
         @Override protected void body() {
             Throwable err = null;
 
             try {
                 while (!isCancelled()) {
-                    Socket sock = srvrSock.accept();
+                    Socket sock;
+
+                    blockingSectionBegin();
+
+                    try {
+                        sock = srvrSock.accept();
+                    }
+                    finally {
+                        blockingSectionEnd();
+                    }
 
                     long tstamp = U.currentTimeMillis();
 
@@ -5854,6 +5873,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     reader.start();
 
                     spi.stats.onServerSocketInitialized(U.currentTimeMillis() 
- tstamp);
+
+                    onIdle();
                 }
             }
             catch (IOException e) {
@@ -7055,15 +7076,11 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** */
-    private class MessageWorkerThreadWithCleanup<T> extends 
MessageWorkerThread {
-        /** */
-        private final MessageWorker worker;
+    private class MessageWorkerThreadWithCleanup<T> extends 
MessageWorkerThread<MessageWorker<T>> {
 
         /** {@inheritDoc} */
         private MessageWorkerThreadWithCleanup(MessageWorker<T> worker, 
IgniteLogger log) {
             super(worker, log);
-
-            this.worker = worker;
         }
 
         /** {@inheritDoc} */
@@ -7085,17 +7102,17 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Slightly modified {@link IgniteSpiThread} intended to use with message 
workers.
      */
-    private class MessageWorkerThread extends IgniteSpiThread {
+    private class MessageWorkerThread<W extends GridWorker> extends 
IgniteSpiThread {
         /**
          * Backed interrupted flag, once set, it is not affected by further 
{@link Thread#interrupted()} calls.
          */
         private volatile boolean interrupted;
 
         /** */
-        private final GridWorker worker;
+        protected final W worker;
 
         /** {@inheritDoc} */
-        private MessageWorkerThread(GridWorker worker, IgniteLogger log) {
+        private MessageWorkerThread(W worker, IgniteLogger log) {
             super(worker.igniteInstanceName(), worker.name(), log);
 
             this.worker = worker;
@@ -7133,6 +7150,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Polling timeout. */
         private final long pollingTimeout;
 
+        /** */
+        private Runnable beforeEachPoll;
+
         /**
          * @param name Worker name.
          * @param log Logger.
@@ -7150,12 +7170,22 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.pollingTimeout = pollingTimeout;
         }
 
+        /**
+         * @param act action to be executed before each timed queue poll.
+         */
+        void setBeforeEachPollAction(Runnable act) {
+            beforeEachPoll = act;
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             if (log.isDebugEnabled())
                 log.debug("Message worker started [locNodeId=" + 
getConfiguredNodeId() + ']');
 
             while (!isCancelled()) {
+                if (beforeEachPoll != null)
+                    beforeEachPoll.run();
+
                 T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
 
                 if (msg == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java
index dc5f1f5..0484598 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/AbstractFailureHandlerTest.java
@@ -41,7 +41,7 @@ public class AbstractFailureHandlerTest extends 
GridCommonAbstractTest {
     /**
      *
      */
-    protected static class DummyFailureHandler implements FailureHandler {
+    protected static class DummyFailureHandler extends AbstractFailureHandler {
         /** Failure. */
         private volatile boolean failure;
 
@@ -49,7 +49,7 @@ public class AbstractFailureHandlerTest extends 
GridCommonAbstractTest {
         private volatile FailureContext ctx;
 
         /** {@inheritDoc} */
-        @Override public boolean onFailure(Ignite ignite, FailureContext 
failureCtx) {
+        @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
             failure = true;
 
             ctx = failureCtx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
new file mode 100644
index 0000000..3ca7948
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.failure;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Tests the handling of long blocking operations in system-critical workers.
+ */
+public class SystemWorkersBlockingTest extends GridCommonAbstractTest {
+    /** Handler latch. */
+    private static volatile CountDownLatch hndLatch;
+
+    /** */
+    private static final long FAILURE_DETECTION_TIMEOUT = 5_000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setFailureHandler(new AbstractFailureHandler() {
+            @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
+                hndLatch.countDown();
+
+                return false;
+            }
+        });
+
+        cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        hndLatch = new CountDownLatch(1);
+
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBlockingWorker() throws Exception {
+        IgniteEx ignite = grid(0);
+
+        GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
+            @Override protected void body() throws InterruptedException {
+                Thread.sleep(Long.MAX_VALUE);
+            }
+        };
+
+        new IgniteThread(worker).start();
+
+        while (worker.runner() == null)
+            Thread.sleep(10);
+
+        ignite.context().workersRegistry().register(worker);
+
+        
assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 
2, TimeUnit.MILLISECONDS));
+
+        Thread runner = worker.runner();
+
+        runner.interrupt();
+        runner.join(1000);
+
+        assertFalse(runner.isAlive());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
index 0df870d..638e6f1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
@@ -25,11 +25,13 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.thread.IgniteThread;
 
 /**
  * Tests system critical workers termination.
@@ -38,6 +40,9 @@ public class SystemWorkersTerminationTest extends 
GridCommonAbstractTest {
     /** Handler latch. */
     private static volatile CountDownLatch hndLatch;
 
+    /** */
+    private static final long FAILURE_DETECTION_TIMEOUT = 5_000;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -53,6 +58,8 @@ public class SystemWorkersTerminationTest extends 
GridCommonAbstractTest {
 
         cfg.setDataStorageConfiguration(dsCfg);
 
+        cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
         return cfg;
     }
 
@@ -112,6 +119,32 @@ public class SystemWorkersTerminationTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSyntheticWorkerTermination() throws Exception {
+        hndLatch = new CountDownLatch(1);
+
+        IgniteEx ignite = grid(0);
+
+        GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
+            @Override protected void body() throws InterruptedException {
+                
Thread.sleep(ignite.configuration().getFailureDetectionTimeout() / 2);
+            }
+        };
+
+        new IgniteThread(worker).start();
+
+        while (worker.runner() == null)
+            Thread.sleep(10);
+
+        ignite.context().workersRegistry().register(worker);
+
+        worker.runner().join();
+
+        
assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 
2, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void deleteWorkFiles() throws Exception {
         cleanPersistenceDir();
 
@@ -121,9 +154,9 @@ public class SystemWorkersTerminationTest extends 
GridCommonAbstractTest {
     /**
      * Test failure handler.
      */
-    private class TestFailureHandler implements FailureHandler {
+    private class TestFailureHandler extends AbstractFailureHandler {
         /** {@inheritDoc} */
-        @Override public boolean onFailure(Ignite ignite, FailureContext 
failureCtx) {
+        @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
             hndLatch.countDown();
 
             return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java 
b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
index 545c9ea..09dce9b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
@@ -24,7 +24,7 @@ import org.apache.ignite.Ignite;
 /**
  * Test failure handler implementation
  */
-public class TestFailureHandler implements FailureHandler {
+public class TestFailureHandler extends AbstractFailureHandler {
     /** Invalidate. */
     private final boolean invalidate;
 
@@ -51,7 +51,7 @@ public class TestFailureHandler implements FailureHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onFailure(Ignite ignite, FailureContext 
failureCtx) {
+    @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
         this.failureCtx = failureCtx;
 
         if (latch != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
index 9a98a88..8744465 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -36,6 +36,8 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.TestFailureHandler;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -215,11 +217,13 @@ public class IgniteClientRejoinTest extends 
GridCommonAbstractTest {
                     String nodeName = "client" + idx;
 
                     IgniteConfiguration cfg = getConfiguration(nodeName)
-                        .setFailureHandler((ignite, failureCtx) -> {
-                            // This should _not_ fire when exchange-worker 
terminates before reconnect.
-                            Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
+                        .setFailureHandler(new AbstractFailureHandler() {
+                            @Override protected boolean handle(Ignite ignite, 
FailureContext failureCtx) {
+                                // This should _not_ fire when exchange-worker 
terminates before reconnect.
+                                
Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
 
-                            return false;
+                                return false;
+                            }
                         });
 
                     return startGrid(nodeName, optimize(cfg), null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index 40025f6..059b5ee 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -32,8 +32,8 @@ import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
 import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -418,7 +418,7 @@ public class IgnitePdsCorruptedStoreTest extends 
GridCommonAbstractTest {
     /**
      * Dummy failure handler
      */
-    public static class DummyFailureHandler implements FailureHandler {
+    public static class DummyFailureHandler extends AbstractFailureHandler {
         /** Failure. */
         private volatile boolean failure = false;
 
@@ -440,7 +440,7 @@ public class IgnitePdsCorruptedStoreTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public boolean onFailure(Ignite ignite, FailureContext 
failureCtx) {
+        @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
             failure = true;
             error = failureCtx.error();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
index daa2aeb..353bc50 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
@@ -58,6 +58,8 @@ public class IgnitePdsPageSizesTest extends 
GridCommonAbstractTest {
                 .setAffinity(new RendezvousAffinityFunction(false, 32))
         );
 
+        cfg.setFailureDetectionTimeout(20_000);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java
index 7318c25..b36bac0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTaskCancelingTest.java
@@ -25,20 +25,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
 import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -80,8 +77,8 @@ public class IgnitePdsTaskCancelingTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        cfg.setFailureHandler(new FailureHandler() {
-            @Override public boolean onFailure(Ignite ignite, FailureContext 
failureCtx) {
+        cfg.setFailureHandler(new AbstractFailureHandler() {
+            @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
                 failure.set(true);
 
                 return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index e84563a..dd10479 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -80,6 +80,8 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(ccfg1);
 
+        cfg.setFailureDetectionTimeout(20_000);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 587be71..8ddfd44 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -28,8 +28,8 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
 import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
@@ -838,9 +838,9 @@ public class GridCacheMessageSelfTest extends 
GridCommonAbstractTest {
     /**
      *
      */
-    private static class TestFailureHandler implements FailureHandler {
+    private static class TestFailureHandler extends AbstractFailureHandler {
         /** {@inheritDoc} */
-        @Override public boolean onFailure(Ignite ignite, FailureContext 
failureCtx) {
+        @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
             failureLatch.countDown();
 
             return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
index 7d4d802..4c66d24 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
@@ -22,8 +22,8 @@ import java.util.Collection;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
 import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -83,9 +83,9 @@ public class TcpDiscoverySegmentationPolicyTest extends 
GridCommonAbstractTest {
     /**
      * Test failure handler.
      */
-    private static class TestFailureHandler implements FailureHandler {
+    private static class TestFailureHandler extends AbstractFailureHandler {
         /** {@inheritDoc} */
-        @Override public boolean onFailure(Ignite ignite, FailureContext 
failureCtx) {
+        @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
             dfltFailureHndInvoked = true;
 
             return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index f15a72e..a73367a 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -994,7 +994,7 @@ public abstract class 
IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
      */
     public void testSetTimes() throws Exception {
         Path fsHome = new Path(primaryFsUri);
-        final Path file = new Path(fsHome, "/heartbeat");
+        final Path file = new Path(fsHome, "/heartbeatTs");
 
         fs.create(file).close();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f59d29b9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 0b3ded4..fcbad9b 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -126,6 +126,9 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
     /** Log only. */
     private boolean logOnly;
 
+    /** */
+    private long customFailureDetectionTimeout = -1;
+
     /** {@inheritDoc} */
     @Override protected boolean isMultiJvm() {
         return fork;
@@ -184,6 +187,9 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
         if (!getTestIgniteInstanceName(0).equals(gridName))
             cfg.setUserAttributes(F.asMap(HAS_CACHE, true));
 
+        if (customFailureDetectionTimeout > 0)
+            cfg.setFailureDetectionTimeout(customFailureDetectionTimeout);
+
         return cfg;
     }
 
@@ -475,7 +481,11 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testHugeCheckpointRecord() throws Exception {
+        long prevFDTimeout = customFailureDetectionTimeout;
+
         try {
+            customFailureDetectionTimeout = 40_000;
+
             final IgniteEx ignite = startGrid(1);
 
             ignite.cluster().active(true);
@@ -516,6 +526,8 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
             fut.get();
         }
         finally {
+            customFailureDetectionTimeout = prevFDTimeout;
+
             stopAllGrids();
         }
     }

Reply via email to