ignite-7772 System workers critical failures handling

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c807ae95
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c807ae95
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c807ae95

Branch: refs/heads/ignite-6083
Commit: c807ae952c233cf1a8c0a63d543fafe19c40c6aa
Parents: 05d7092
Author: Andrey Kuznetsov <stku...@gmail.com>
Authored: Tue Apr 10 17:30:12 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Tue Apr 10 17:30:12 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |  15 +-
 .../GridClientConnectionManagerAdapter.java     |   6 +
 .../impl/GridTcpRouterNioListenerAdapter.java   |   6 +
 .../discovery/GridDiscoveryManager.java         |  16 +-
 .../GridCachePartitionExchangeManager.java      |  12 +-
 .../cache/GridCacheSharedTtlCleanupManager.java |  41 +++--
 .../GridCacheDatabaseSharedManager.java         |  60 +++++--
 .../wal/FileWriteAheadLogManager.java           | 157 ++++++++++++-------
 .../wal/FsyncModeFileWriteAheadLogManager.java  |  34 +++-
 .../timeout/GridTimeoutProcessor.java           | 102 +++++++-----
 .../ignite/internal/util/StripedExecutor.java   |  69 +++++---
 .../ignite/internal/util/nio/GridNioServer.java |  43 ++++-
 .../util/nio/GridNioServerListener.java         |   6 +
 .../util/nio/GridNioServerListenerAdapter.java  |   6 +
 .../communication/tcp/TcpCommunicationSpi.java  |  41 ++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  51 +++++-
 .../internal/util/StripedExecutorTest.java      |   2 +-
 17 files changed, 501 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 417ba1e..10a0752 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -137,6 +137,7 @@ import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
 import static 
org.apache.ignite.configuration.MemoryConfiguration.DFLT_MEMORY_POLICY_MAX_SIZE;
 import static 
org.apache.ignite.configuration.MemoryConfiguration.DFLT_MEM_PLC_DEFAULT_NAME;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.IgniteComponentType.SPRING;
 import static 
org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_JVM;
 
@@ -1806,7 +1807,13 @@ public class IgnitionEx {
                 cfg.getStripedPoolSize(),
                 cfg.getIgniteInstanceName(),
                 "sys",
-                log);
+                log,
+                new Thread.UncaughtExceptionHandler() {
+                    @Override public void uncaughtException(Thread thread, 
Throwable t) {
+                        if (grid != null)
+                            grid.context().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, t));
+                    }
+                });
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -1846,6 +1853,12 @@ public class IgnitionEx {
                 cfg.getIgniteInstanceName(),
                 "data-streamer",
                 log,
+                new Thread.UncaughtExceptionHandler() {
+                    @Override public void uncaughtException(Thread thread, 
Throwable t) {
+                        if (grid != null)
+                            grid.context().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, t));
+                    }
+                },
                 true);
 
             // Note that we do not pre-start threads here as igfs pool may not 
be needed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 829b188..fe0453f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -38,6 +38,7 @@ import java.util.logging.Logger;
 import javax.net.ssl.SSLContext;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.client.GridClientClosedException;
 import org.apache.ignite.internal.client.GridClientConfiguration;
 import org.apache.ignite.internal.client.GridClientException;
@@ -656,6 +657,11 @@ public abstract class GridClientConnectionManagerAdapter 
implements GridClientCo
             }
         }
 
+        /** {@inheritDoc} */
+        @Override public void onFailure(FailureType failureType, Throwable 
failure) {
+            // No-op.
+        }
+
         /**
          * Handles client handshake response.
          *

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
index 22f5152..75aa6f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.client.GridClientException;
 import org.apache.ignite.internal.client.GridClientFuture;
 import org.apache.ignite.internal.client.GridClientFutureListener;
@@ -191,6 +192,11 @@ public abstract class GridTcpRouterNioListenerAdapter 
implements GridNioServerLi
     }
 
     /** {@inheritDoc} */
+    @Override public void onFailure(FailureType failureType, Throwable 
failure) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onSessionWriteTimeout(GridNioSession ses) {
         U.warn(log, "Closing NIO session because of write timeout.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4c5690e..b0d3256 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -147,6 +147,8 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
@@ -2669,13 +2671,21 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     body0();
                 }
                 catch (InterruptedException e) {
+                    if (!isCancelled)
+                        ctx.failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, e));
+
                     throw e;
                 }
                 catch (Throwable t) {
-                    U.error(log, "Unexpected exception in discovery worker 
thread (ignored).", t);
+                    U.error(log, "Exception in discovery worker thread.", t);
+
+                    if (t instanceof Error) {
+                        FailureType type = t instanceof OutOfMemoryError ? 
CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
 
-                    if (t instanceof Error)
-                        throw (Error)t;
+                        ctx.failure().process(new FailureContext(type, t));
+
+                        throw t;
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 77ffce3..e40493f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -123,6 +123,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.getLong;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
@@ -2274,11 +2275,20 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             try {
                 body0();
             }
+            catch (InterruptedException | IgniteInterruptedCheckedException e) 
{
+                if (!stop)
+                    err = e;
+            }
             catch (Throwable e) {
                 err = e;
             }
             finally {
-                if (!stop)
+                if (err == null && !stop)
+                    err = new IllegalStateException("Thread " + name() + " is 
terminated unexpectedly");
+
+                if (err instanceof OutOfMemoryError)
+                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
                     cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index 8f3d738..613e93b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -20,11 +20,15 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
 
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+
 /**
  * Periodically removes expired entities from caches with {@link 
CacheConfiguration#isEagerTtl()} flag set.
  */
@@ -122,19 +126,38 @@ public class GridCacheSharedTtlCleanupManager extends 
GridCacheSharedManagerAdap
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                boolean expiredRemains = false;
+            Throwable err = null;
+
+            try {
+                while (!isCancelled()) {
+                    boolean expiredRemains = false;
+
+                    for (GridCacheTtlManager mgr : mgrs) {
+                        if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
+                            expiredRemains = true;
 
-                for (GridCacheTtlManager mgr : mgrs) {
-                    if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
-                        expiredRemains = true;
+                        if (isCancelled())
+                            return;
+                    }
 
-                    if (isCancelled())
-                        return;
+                    if (!expiredRemains)
+                        U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
                 }
+            }
+            catch (Throwable t) {
+                if (!(t instanceof IgniteInterruptedCheckedException))
+                    err = t;
 
-                if (!expiredRemains)
-                    U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
+                throw t;
+            }
+            finally {
+                if (err == null && !isCancelled)
+                    err = new IllegalStateException("Thread " + name() + " is 
terminated unexpectedly");
+
+                if (err instanceof OutOfMemoryError)
+                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 70fc688..caf27b7 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -165,6 +165,8 @@ import static java.nio.file.StandardOpenOption.READ;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
 
 /**
@@ -2787,32 +2789,58 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         }
 
         /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                waitCheckpointEvent();
+        @Override protected void body() {
+            Throwable err = null;
 
-                GridFutureAdapter<Void> enableChangeApplied = 
GridCacheDatabaseSharedManager.this.enableChangeApplied;
+            try {
+                while (!isCancelled()) {
+                    waitCheckpointEvent();
 
-                if (enableChangeApplied != null) {
-                    enableChangeApplied.onDone();
+                    GridFutureAdapter<Void> enableChangeApplied = 
GridCacheDatabaseSharedManager.this.enableChangeApplied;
 
-                    GridCacheDatabaseSharedManager.this.enableChangeApplied = 
null;
-                }
+                    if (enableChangeApplied != null) {
+                        enableChangeApplied.onDone();
 
-                if (checkpointsEnabled)
-                    doCheckpoint();
-                else {
-                    synchronized (this) {
-                        scheduledCp.nextCpTs = U.currentTimeMillis() + 
checkpointFreq;
+                        
GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
+                    }
+
+                    if (checkpointsEnabled)
+                        doCheckpoint();
+                    else {
+                        synchronized (this) {
+                            scheduledCp.nextCpTs = U.currentTimeMillis() + 
checkpointFreq;
+                        }
                     }
                 }
             }
+            catch (Throwable t) {
+                err = t;
+
+                scheduledCp.cpFinishFut.onDone(t);
+
+                throw t;
+            }
+            finally {
+                if (err == null && !(stopping && isCancelled))
+                    err = new IllegalStateException("Thread " + name() + " is 
terminated unexpectedly");
+
+                if (err instanceof OutOfMemoryError)
+                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
 
             // Final run after the cancellation.
-            if (checkpointsEnabled && !shutdownNow)
-                doCheckpoint();
+            if (checkpointsEnabled && !shutdownNow) {
+                try {
+                    doCheckpoint();
 
-            scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is 
stopping."));
+                    scheduledCp.cpFinishFut.onDone(new 
NodeStoppingException("Node is stopping."));
+                }
+                catch (Throwable e) {
+                    scheduledCp.cpFinishFut.onDone(e);
+                }
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 2fff481..a40811b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -117,6 +117,8 @@ import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
 import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
 import static org.apache.ignite.internal.util.IgniteUtils.findField;
@@ -682,7 +684,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         catch (IgniteCheckedException e) {
             U.error(log, "Unable to perform segment rollover: " + 
e.getMessage(), e);
 
-            cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+            cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, e));
         }
     }
 
@@ -1234,7 +1236,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         catch (IOException e) {
             StorageException se = new StorageException("Unable to initialize 
WAL segment", e);
 
-            cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, se));
+            cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, se));
 
             throw se;
         }
@@ -1499,6 +1501,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 }
             }
 
+            Throwable err = null;
+
             try {
                 synchronized (this) {
                     while (curAbsWalIdx == -1 && !stopped)
@@ -1560,6 +1564,18 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             catch (InterruptedException ignore) {
                 Thread.currentThread().interrupt();
             }
+            catch (Throwable t) {
+                err = t;
+            }
+            finally {
+                if (err == null && !stopped)
+                    err = new IllegalStateException("Thread " + getName() + " 
is terminated unexpectedly");
+
+                if (err instanceof OutOfMemoryError)
+                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
         }
 
         /**
@@ -1884,8 +1900,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 }
                 catch (IgniteCheckedException | IOException e) {
                     U.error(log, "Unexpected error during WAL compression", e);
-
-                    cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
                 }
                 catch (InterruptedException ignore) {
                     Thread.currentThread().interrupt();
@@ -2005,6 +2019,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override public void run() {
+            Throwable err = null;
+
             while (!Thread.currentThread().isInterrupted() && !stopped) {
                 try {
                     long segmentToDecompress = segmentsQueue.take();
@@ -2034,10 +2050,17 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 catch (InterruptedException ignore) {
                     Thread.currentThread().interrupt();
                 }
-                catch (IOException e) {
-                    U.error(log, "Unexpected error during WAL decompression", 
e);
+                catch (Throwable t) {
+                    err = t;
+                }
+                finally {
+                    if (err == null && !stopped)
+                        err = new IllegalStateException("Thread " + getName() 
+ " is terminated unexpectedly");
 
-                    cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+                    if (err instanceof OutOfMemoryError)
+                        cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                    else if (err != null)
+                        cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
                 }
             }
         }
@@ -3146,78 +3169,94 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override public void run() {
-            while (!shutdown && !Thread.currentThread().isInterrupted()) {
-                while (waiters.isEmpty()) {
-                    if (!shutdown)
-                        LockSupport.park();
-                    else {
-                        unparkWaiters(Long.MAX_VALUE);
-
-                        return;
-                    }
-                }
+            Throwable err = null;
 
-                Long pos = null;
+            try {
+                while (!shutdown && !Thread.currentThread().isInterrupted()) {
+                    while (waiters.isEmpty()) {
+                        if (!shutdown)
+                            LockSupport.park();
+                        else {
+                            unparkWaiters(Long.MAX_VALUE);
 
-                for (Long val : waiters.values()) {
-                    if (val > Long.MIN_VALUE)
-                        pos = val;
-                }
+                            return;
+                        }
+                    }
 
-                if (pos == null)
-                    continue;
-                else if (pos < UNCONDITIONAL_FLUSH) {
-                    try {
-                        assert pos == FILE_CLOSE || pos == FILE_FORCE : pos;
+                    Long pos = null;
 
-                        if (pos == FILE_CLOSE)
-                            currHnd.fileIO.close();
-                        else if (pos == FILE_FORCE)
-                            currHnd.fileIO.force();
+                    for (Long val : waiters.values()) {
+                        if (val > Long.MIN_VALUE)
+                            pos = val;
                     }
-                    catch (IOException e) {
-                        log.error("Exception in WAL writer thread: ", e);
 
-                        err = e;
+                    if (pos == null)
+                        continue;
+                    else if (pos < UNCONDITIONAL_FLUSH) {
+                        try {
+                            assert pos == FILE_CLOSE || pos == FILE_FORCE : 
pos;
 
-                        unparkWaiters(Long.MAX_VALUE);
+                            if (pos == FILE_CLOSE)
+                                currHnd.fileIO.close();
+                            else if (pos == FILE_FORCE)
+                                currHnd.fileIO.force();
+                        }
+                        catch (IOException e) {
+                            log.error("Exception in WAL writer thread: ", e);
 
-                        return;
-                    }
+                            err = e;
 
-                    unparkWaiters(pos);
-                }
+                            unparkWaiters(Long.MAX_VALUE);
 
-                List<SegmentedRingByteBuffer.ReadSegment> segs = 
currentHandle().buf.poll(pos);
+                            return;
+                        }
 
-                if (segs == null) {
-                    unparkWaiters(pos);
+                        unparkWaiters(pos);
+                    }
 
-                    continue;
-                }
+                    List<SegmentedRingByteBuffer.ReadSegment> segs = 
currentHandle().buf.poll(pos);
 
-                for (int i = 0; i < segs.size(); i++) {
-                    SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
+                    if (segs == null) {
+                        unparkWaiters(pos);
 
-                    try {
-                        writeBuffer(seg.position(), seg.buffer());
+                        continue;
                     }
-                    catch (Throwable e) {
-                        log.error("Exception in WAL writer thread: ", e);
 
-                        err = e;
-                    }
-                    finally {
-                        seg.release();
+                    for (int i = 0; i < segs.size(); i++) {
+                        SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
+
+                        try {
+                            writeBuffer(seg.position(), seg.buffer());
+                        }
+                        catch (Throwable e) {
+                            log.error("Exception in WAL writer thread: ", e);
+
+                            err = e;
+                        }
+                        finally {
+                            seg.release();
 
-                        long p = pos <= UNCONDITIONAL_FLUSH || err != null ? 
Long.MAX_VALUE : currentHandle().written;
+                            long p = pos <= UNCONDITIONAL_FLUSH || err != null 
? Long.MAX_VALUE : currentHandle().written;
 
-                        unparkWaiters(p);
+                            unparkWaiters(p);
+                        }
                     }
                 }
+
+                unparkWaiters(Long.MAX_VALUE);
             }
+            catch (Throwable t) {
+                err = t;
+            }
+            finally {
+                if (err == null && !shutdown)
+                    err = new IllegalStateException("Thread " + getName() + " 
is terminated unexpectedly");
 
-            unparkWaiters(Long.MAX_VALUE);
+                if (err instanceof OutOfMemoryError)
+                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
         }
 
         /**
@@ -3283,7 +3322,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             Throwable err = walWriter.err;
 
             if (err != null)
-                cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, err));
+                cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
 
             if (expPos == UNCONDITIONAL_FLUSH)
                 expPos = (currentHandle().buf.tail());
@@ -3372,7 +3411,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             catch (IOException e) {
                 StorageException se = new StorageException("Unable to write", 
e);
 
-                cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, se));
+                cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, se));
 
                 throw se;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index 59196bb..c7d2c11 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -110,6 +110,7 @@ import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 
 /**
  * File WAL manager.
@@ -1338,6 +1339,8 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                 }
             }
 
+            Throwable err = null;
+
             try {
                 synchronized (this) {
                     while (curAbsWalIdx == -1 && !stopped)
@@ -1399,6 +1402,18 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             catch (InterruptedException ignore) {
                 Thread.currentThread().interrupt();
             }
+            catch (Throwable t) {
+                err = t;
+            }
+            finally {
+                if (err == null && !stopped)
+                    err = new IllegalStateException("Thread " + getName() + " 
is terminated unexpectedly");
+
+                if (err instanceof OutOfMemoryError)
+                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
         }
 
         /**
@@ -1721,8 +1736,6 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                 }
                 catch (IgniteCheckedException | IOException e) {
                     U.error(log, "Unexpected error during WAL compression", e);
-
-                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, e));
                 }
                 catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
@@ -1814,6 +1827,8 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         /** {@inheritDoc} */
         @Override public void run() {
+            Throwable err = null;
+
             while (!Thread.currentThread().isInterrupted() && !stopped) {
                 try {
                     long segmentToDecompress = segmentsQueue.take();
@@ -1840,13 +1855,20 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                         
decompressionFutures.remove(segmentToDecompress).onDone();
                     }
                 }
-                catch (InterruptedException e){
+                catch (InterruptedException ignore) {
                     Thread.currentThread().interrupt();
                 }
-                catch (IOException e) {
-                    U.error(log, "Unexpected error during WAL decompression", 
e);
+                catch (Throwable t) {
+                    err = t;
+                }
+                finally {
+                    if (err == null && !stopped)
+                        err = new IllegalStateException("Thread " + getName() 
+ " is terminated unexpectedly");
 
-                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, e));
+                    if (err instanceof OutOfMemoryError)
+                        cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                    else if (err != null)
+                        cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index ff6beb4..a09d6fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.util.Comparator;
 import java.util.Iterator;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
@@ -32,6 +33,9 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+
 /**
  * Detects timeout events and processes them.
  */
@@ -146,61 +150,81 @@ public class GridTimeoutProcessor extends 
GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            while (!isCancelled()) {
-                long now = U.currentTimeMillis();
+            Throwable err = null;
 
-                for (Iterator<GridTimeoutObject> iter = 
timeoutObjs.iterator(); iter.hasNext();) {
-                    GridTimeoutObject timeoutObj = iter.next();
+            try {
+                while (!isCancelled()) {
+                    long now = U.currentTimeMillis();
 
-                    if (timeoutObj.endTime() <= now) {
-                        try {
-                            boolean rmvd = timeoutObjs.remove(timeoutObj);
+                    for (Iterator<GridTimeoutObject> iter = 
timeoutObjs.iterator(); iter.hasNext(); ) {
+                        GridTimeoutObject timeoutObj = iter.next();
 
-                            if (log.isDebugEnabled())
-                                log.debug("Timeout has occurred [obj=" + 
timeoutObj + ", process=" + rmvd + ']');
+                        if (timeoutObj.endTime() <= now) {
+                            try {
+                                boolean rmvd = timeoutObjs.remove(timeoutObj);
 
-                            if (rmvd)
-                                timeoutObj.onTimeout();
-                        }
-                        catch (Throwable e) {
-                            if (isCancelled() && !(e instanceof Error)){
                                 if (log.isDebugEnabled())
-                                    log.debug("Error when executing timeout 
callback: " + timeoutObj);
+                                    log.debug("Timeout has occurred [obj=" + 
timeoutObj + ", process=" + rmvd + ']');
 
-                                return;
+                                if (rmvd)
+                                    timeoutObj.onTimeout();
                             }
+                            catch (Throwable e) {
+                                if (isCancelled() && !(e instanceof Error)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Error when executing 
timeout callback: " + timeoutObj);
 
-                            U.error(log, "Error when executing timeout 
callback: " + timeoutObj, e);
+                                    return;
+                                }
 
-                            if (e instanceof Error)
-                                throw e;
+                                U.error(log, "Error when executing timeout 
callback: " + timeoutObj, e);
+
+                                if (e instanceof Error)
+                                    throw e;
+                            }
                         }
+                        else
+                            break;
                     }
-                    else
-                        break;
-                }
-
-                synchronized (mux) {
-                    while (!isCancelled()) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTimeoutObject(..)' method.
-                        GridTimeoutObject first = timeoutObjs.firstx();
-
-                        if (first != null) {
-                            long waitTime = first.endTime() - 
U.currentTimeMillis();
 
-                            if (waitTime > 0)
-                                mux.wait(waitTime);
+                    synchronized (mux) {
+                        while (!isCancelled()) {
+                            // Access of the first element must be inside of
+                            // synchronization block, so we don't miss out
+                            // on thread notification events sent from
+                            // 'addTimeoutObject(..)' method.
+                            GridTimeoutObject first = timeoutObjs.firstx();
+
+                            if (first != null) {
+                                long waitTime = first.endTime() - 
U.currentTimeMillis();
+
+                                if (waitTime > 0)
+                                    mux.wait(waitTime);
+                                else
+                                    break;
+                            }
                             else
-                                break;
+                                mux.wait(5000);
                         }
-                        else
-                            mux.wait(5000);
                     }
                 }
             }
+            catch (Throwable t) {
+                if (!(t instanceof InterruptedException))
+                    err = t;
+
+                throw t;
+            }
+            finally {
+                if (err == null && !isCancelled)
+                    err = new IllegalStateException("Thread " + name() + " is 
terminated unexpectedly.");
+
+                if (err instanceof OutOfMemoryError)
+                    ctx.failure().process(new FailureContext(CRITICAL_ERROR, 
err));
+                else if (err != null)
+                    ctx.failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
+
         }
     }
 
@@ -284,4 +308,4 @@ public class GridTimeoutProcessor extends 
GridProcessorAdapter {
             return S.toString(CancelableTask.class, this);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 630d34c..c6383ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -64,9 +64,11 @@ public class StripedExecutor implements ExecutorService {
      * @param igniteInstanceName Node name.
      * @param poolName Pool name.
      * @param log Logger.
+     * @param errHnd Exception handler.
      */
-    public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log) {
-        this(cnt, igniteInstanceName, poolName, log, false);
+    public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log,
+        Thread.UncaughtExceptionHandler errHnd) {
+        this(cnt, igniteInstanceName, poolName, log, errHnd, false);
     }
 
     /**
@@ -74,9 +76,11 @@ public class StripedExecutor implements ExecutorService {
      * @param igniteInstanceName Node name.
      * @param poolName Pool name.
      * @param log Logger.
+     * @param errHnd Exception handler.
      * @param stealTasks {@code True} to steal tasks.
      */
-    public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log, boolean stealTasks) {
+    public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log,
+        Thread.UncaughtExceptionHandler errHnd, boolean stealTasks) {
         A.ensure(cnt > 0, "cnt > 0");
 
         boolean success = false;
@@ -91,15 +95,9 @@ public class StripedExecutor implements ExecutorService {
 
         try {
             for (int i = 0; i < cnt; i++) {
-                stripes[i] = stealTasks ? new StripeConcurrentQueue(
-                    igniteInstanceName,
-                    poolName,
-                    i,
-                    log, stripes) : new StripeConcurrentQueue(
-                        igniteInstanceName,
-                        poolName,
-                        i,
-                        log);
+                stripes[i] = stealTasks
+                    ? new StripeConcurrentQueue(igniteInstanceName, poolName, 
i, log, stripes, errHnd)
+                    : new StripeConcurrentQueue(igniteInstanceName, poolName, 
i, log, errHnd);
             }
 
             for (int i = 0; i < cnt; i++)
@@ -434,22 +432,28 @@ public class StripedExecutor implements ExecutorService {
         /** Thread executing the loop. */
         protected Thread thread;
 
+        /** Exception handler. */
+        private Thread.UncaughtExceptionHandler errHnd;
+
         /**
          * @param igniteInstanceName Ignite instance name.
          * @param poolName Pool name.
          * @param idx Stripe index.
          * @param log Logger.
+         * @param errHnd Exception handler.
          */
         public Stripe(
             String igniteInstanceName,
             String poolName,
             int idx,
-            IgniteLogger log
+            IgniteLogger log,
+            Thread.UncaughtExceptionHandler errHnd
         ) {
             this.igniteInstanceName = igniteInstanceName;
             this.poolName = poolName;
             this.idx = idx;
             this.log = log;
+            this.errHnd = errHnd;
         }
 
         /**
@@ -463,6 +467,8 @@ public class StripedExecutor implements ExecutorService {
                 idx,
                 GridIoPolicy.UNDEFINED);
 
+            thread.setUncaughtExceptionHandler(errHnd);
+
             thread.start();
         }
 
@@ -518,9 +524,19 @@ public class StripedExecutor implements ExecutorService {
                     return;
                 }
                 catch (Throwable e) {
+                    if (e instanceof OutOfMemoryError) {
+                        // Re-throwing to exploit uncaught exception handler.
+                        throw e;
+                    }
+
                     U.error(log, "Failed to execute runnable.", e);
                 }
             }
+
+            if (!stopping) {
+                throw new IllegalStateException("Thread " + 
Thread.currentThread().getName() +
+                    " is terminated unexpectedly");
+            }
         }
 
         /**
@@ -576,14 +592,16 @@ public class StripedExecutor implements ExecutorService {
          * @param poolName Pool name.
          * @param idx Stripe index.
          * @param log Logger.
+         * @param errHnd Exception handler.
          */
         StripeConcurrentQueue(
             String igniteInstanceName,
             String poolName,
             int idx,
-            IgniteLogger log
+            IgniteLogger log,
+            Thread.UncaughtExceptionHandler errHnd
         ) {
-            this(igniteInstanceName, poolName, idx, log, null);
+            this(igniteInstanceName, poolName, idx, log, null, errHnd);
         }
 
         /**
@@ -591,19 +609,22 @@ public class StripedExecutor implements ExecutorService {
          * @param poolName Pool name.
          * @param idx Stripe index.
          * @param log Logger.
+         * @param errHnd Exception handler.
          */
         StripeConcurrentQueue(
             String igniteInstanceName,
             String poolName,
             int idx,
             IgniteLogger log,
-            Stripe[] others
+            Stripe[] others,
+            Thread.UncaughtExceptionHandler errHnd
         ) {
             super(
                 igniteInstanceName,
                 poolName,
                 idx,
-                log);
+                log,
+                errHnd);
 
             this.others = others;
 
@@ -702,17 +723,20 @@ public class StripedExecutor implements ExecutorService {
          * @param poolName Pool name.
          * @param idx Stripe index.
          * @param log Logger.
+         * @param errHnd Exception handler.
          */
         public StripeConcurrentQueueNoPark(
             String igniteInstanceName,
             String poolName,
             int idx,
-            IgniteLogger log
+            IgniteLogger log,
+            Thread.UncaughtExceptionHandler errHnd
         ) {
             super(igniteInstanceName,
                 poolName,
                 idx,
-                log);
+                log,
+                errHnd);
         }
 
         /** {@inheritDoc} */
@@ -758,17 +782,20 @@ public class StripedExecutor implements ExecutorService {
          * @param poolName Pool name.
          * @param idx Stripe index.
          * @param log Logger.
+         * @param errHnd Exception handler.
          */
         public StripeConcurrentBlockingQueue(
             String igniteInstanceName,
             String poolName,
             int idx,
-            IgniteLogger log
+            IgniteLogger log,
+            Thread.UncaughtExceptionHandler errHnd
         ) {
             super(igniteInstanceName,
                 poolName,
                 idx,
-                log);
+                log,
+                errHnd);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 0fcde0e..3597a05 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -77,6 +77,8 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER;
 import static 
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPERATION;
 
@@ -1749,6 +1751,8 @@ public class GridNioServer<T> {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            Throwable err = null;
+
             try {
                 boolean reset = false;
 
@@ -1774,9 +1778,24 @@ public class GridNioServer<T> {
             catch (Throwable e) {
                 U.error(log, "Caught unhandled exception in NIO worker thread 
(restart the node).", e);
 
+                err = e;
+
                 if (e instanceof Error)
                     throw e;
             }
+            finally {
+                if (err instanceof OutOfMemoryError)
+                    lsnr.onFailure(CRITICAL_ERROR, err);
+                else if (!closed) {
+                    if (err == null)
+                        lsnr.onFailure(SYSTEM_WORKER_TERMINATION,
+                            new IllegalStateException("Thread " + name() + " 
is terminated unexpectedly"));
+                    else if (err instanceof InterruptedException)
+                        lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+                }
+                else if (err != null)
+                    lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+            }
         }
 
         /**
@@ -2790,6 +2809,8 @@ public class GridNioServer<T> {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            Throwable err = null;
+
             try {
                 boolean reset = false;
 
@@ -2812,8 +2833,28 @@ public class GridNioServer<T> {
                     }
                 }
             }
+            catch (Throwable t) {
+                if (!(t instanceof IgniteInterruptedCheckedException))
+                    err = t;
+
+                throw t;
+            }
             finally {
-                closeSelector(); // Safety.
+                try {
+                    closeSelector(); // Safety.
+                }
+                catch (RuntimeException ignore) {
+                    // No-op.
+                }
+
+                if (err == null && !closed)
+                    err = new IllegalStateException("Thread " + name() + " is 
terminated unexpectedly");
+
+                if (err instanceof OutOfMemoryError)
+                    lsnr.onFailure(CRITICAL_ERROR, err);
+                else if (err != null)
+                    lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
index db28792..14c5a74 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.failure.FailureType;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -69,4 +70,9 @@ public interface GridNioServerListener<T> {
      * @param ses Session that is idle.
      */
     public void onSessionIdleTimeout(GridNioSession ses);
+
+    /**
+     * Called when critical failure occurs in server implementation.
+     */
+    public void onFailure(FailureType failureType, Throwable failure);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
index 5d222c1..b6b20b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.failure.FailureType;
+
 /**
  * Server listener adapter providing empty methods implementation for rarely 
used methods.
  */
@@ -35,4 +37,8 @@ public abstract class GridNioServerListenerAdapter<T> 
implements GridNioServerLi
     @Override public void onMessageSent(GridNioSession ses, T msg) {
         // No-op.
     }
+
+    @Override public void onFailure(FailureType failureType, Throwable 
failure) {
+        // No-op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4a0710e..9e7b592 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -62,7 +62,10 @@ import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
@@ -151,6 +154,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
 import static 
org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
 import static 
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
@@ -798,6 +803,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                 }
             }
 
+            /** {@inheritDoc} */
+            @Override public void onFailure(FailureType failureType, Throwable 
failure) {
+                ((IgniteEx)ignite).context().failure().process(new 
FailureContext(failureType, failure));
+            }
+
             /**
              * @param recovery Recovery descriptor.
              * @param ses Session.
@@ -4190,13 +4200,32 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
             if (log.isDebugEnabled())
                 log.debug("Tcp communication worker has been started.");
 
-            while (!isInterrupted()) {
-                DisconnectedSessionInfo disconnectData = 
q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+            Throwable err = null;
 
-                if (disconnectData != null)
-                    processDisconnect(disconnectData);
-                else
-                    processIdle();
+            try {
+                while (!isInterrupted()) {
+                    DisconnectedSessionInfo disconnectData = 
q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+
+                    if (disconnectData != null)
+                        processDisconnect(disconnectData);
+                    else
+                        processIdle();
+                }
+            }
+            catch (Throwable t) {
+                if (!(t instanceof InterruptedException))
+                    err = t;
+
+                throw t;
+            }
+            finally {
+                if (err == null && !stopping)
+                    err = new IllegalStateException("Thread  " + getName() + " 
is terminated unexpectedly.");
+
+                if (err instanceof OutOfMemoryError)
+                    ((IgniteEx)ignite).context().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    ((IgniteEx)ignite).context().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4aa1316..7bf37e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -50,6 +50,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -66,6 +67,8 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -73,6 +76,7 @@ import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
 import 
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.security.SecurityUtils;
 import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
@@ -137,7 +141,6 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustom
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
@@ -149,6 +152,8 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
@@ -2609,12 +2614,20 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
+            Throwable err = null;
+
             try {
                 super.body();
             }
+            catch (InterruptedException e) {
+                if (!spi.isNodeStopping0())
+                    err = e;
+
+                throw e;
+            }
             catch (Throwable e) {
                 if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) 
{
-                        final Ignite ignite = spi.ignite();
+                    final Ignite ignite = spi.ignite();
 
                     if (ignite != null) {
                         U.error(log, "TcpDiscoverSpi's message worker thread 
failed abnormally. " +
@@ -2637,9 +2650,22 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
 
+                err = e;
+
                 // Must be processed by IgniteSpiThread as well.
                 throw e;
             }
+            finally {
+                if (err == null && !spi.isNodeStopping0())
+                    err = new IllegalStateException("Thread " + getName() + " 
is terminated unexpectedly.");
+
+                FailureProcessor failure = 
((IgniteEx)spi.ignite()).context().failure();
+
+                if (err instanceof OutOfMemoryError)
+                    failure.process(new FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    failure.process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
         }
 
         /**
@@ -5597,7 +5623,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
+        @Override protected void body() {
+            Throwable err = null;
+
             try {
                 while (!isInterrupted()) {
                     Socket sock = srvrSock.accept();
@@ -5630,13 +5658,30 @@ class ServerImpl extends TcpDiscoveryImpl {
                 onException("Failed to accept TCP connection.", e);
 
                 if (!isInterrupted()) {
+                    err = e;
+
                     if (U.isMacInvalidArgumentError(e))
                         U.error(log, "Failed to accept TCP connection\n\t" + 
U.MAC_INVALID_ARG_MSG, e);
                     else
                         U.error(log, "Failed to accept TCP connection.", e);
                 }
             }
+            catch (Throwable t) {
+                err = t;
+
+                throw t;
+            }
             finally {
+                if (err == null && !spi.isNodeStopping0())
+                    err = new IllegalStateException("Thread " + getName() + " 
is terminated unexpectedly.");
+
+                FailureProcessor failure = 
((IgniteEx)spi.ignite()).context().failure();
+
+                if (err instanceof OutOfMemoryError)
+                    failure.process(new FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    failure.process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+
                 U.closeQuiet(srvrSock);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
index 543907f..3fca7af 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
@@ -29,7 +29,7 @@ public class StripedExecutorTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override public void beforeTest() {
-        stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new 
JavaLogger());
+        stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new 
JavaLogger(), (thread, t) -> {});
     }
 
     /** {@inheritDoc} */

Reply via email to