ignite-7772 System workers critical failures handling Signed-off-by: Andrey Gura <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c807ae95 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c807ae95 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c807ae95 Branch: refs/heads/ignite-6083 Commit: c807ae952c233cf1a8c0a63d543fafe19c40c6aa Parents: 05d7092 Author: Andrey Kuznetsov <[email protected]> Authored: Tue Apr 10 17:30:12 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Tue Apr 10 17:30:12 2018 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 15 +- .../GridClientConnectionManagerAdapter.java | 6 + .../impl/GridTcpRouterNioListenerAdapter.java | 6 + .../discovery/GridDiscoveryManager.java | 16 +- .../GridCachePartitionExchangeManager.java | 12 +- .../cache/GridCacheSharedTtlCleanupManager.java | 41 +++-- .../GridCacheDatabaseSharedManager.java | 60 +++++-- .../wal/FileWriteAheadLogManager.java | 157 ++++++++++++------- .../wal/FsyncModeFileWriteAheadLogManager.java | 34 +++- .../timeout/GridTimeoutProcessor.java | 102 +++++++----- .../ignite/internal/util/StripedExecutor.java | 69 +++++--- .../ignite/internal/util/nio/GridNioServer.java | 43 ++++- .../util/nio/GridNioServerListener.java | 6 + .../util/nio/GridNioServerListenerAdapter.java | 6 + .../communication/tcp/TcpCommunicationSpi.java | 41 ++++- .../ignite/spi/discovery/tcp/ServerImpl.java | 51 +++++- .../internal/util/StripedExecutorTest.java | 2 +- 17 files changed, 501 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 417ba1e..10a0752 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -137,6 +137,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME; import static org.apache.ignite.configuration.MemoryConfiguration.DFLT_MEMORY_POLICY_MAX_SIZE; import static org.apache.ignite.configuration.MemoryConfiguration.DFLT_MEM_PLC_DEFAULT_NAME; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.IgniteComponentType.SPRING; import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_JVM; @@ -1806,7 +1807,13 @@ public class IgnitionEx { cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", - log); + log, + new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread thread, Throwable t) { + if (grid != null) + grid.context().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, t)); + } + }); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1846,6 +1853,12 @@ public class IgnitionEx { cfg.getIgniteInstanceName(), "data-streamer", log, + new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread thread, Throwable t) { + if (grid != null) + grid.context().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, t)); + } + }, true); // Note that we do not pre-start threads here as igfs pool may not be needed. http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java index 829b188..fe0453f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java @@ -38,6 +38,7 @@ import java.util.logging.Logger; import javax.net.ssl.SSLContext; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.client.GridClientClosedException; import org.apache.ignite.internal.client.GridClientConfiguration; import org.apache.ignite.internal.client.GridClientException; @@ -656,6 +657,11 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo } } + /** {@inheritDoc} */ + @Override public void onFailure(FailureType failureType, Throwable failure) { + // No-op. + } + /** * Handles client handshake response. * http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java index 22f5152..75aa6f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.client.GridClientException; import org.apache.ignite.internal.client.GridClientFuture; import org.apache.ignite.internal.client.GridClientFutureListener; @@ -191,6 +192,11 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi } /** {@inheritDoc} */ + @Override public void onFailure(FailureType failureType, Throwable failure) { + // No-op. + } + + /** {@inheritDoc} */ @Override public void onSessionWriteTimeout(GridNioSession ses) { U.warn(log, "Closing NIO session because of write timeout."); http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 4c5690e..b0d3256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -147,6 +147,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; @@ -2669,13 +2671,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { body0(); } catch (InterruptedException e) { + if (!isCancelled) + ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, e)); + throw e; } catch (Throwable t) { - U.error(log, "Unexpected exception in discovery worker thread (ignored).", t); + U.error(log, "Exception in discovery worker thread.", t); + + if (t instanceof Error) { + FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION; - if (t instanceof Error) - throw (Error)t; + ctx.failure().process(new FailureContext(type, t)); + + throw t; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 77ffce3..e40493f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -123,6 +123,7 @@ import static org.apache.ignite.IgniteSystemProperties.getLong; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; @@ -2274,11 +2275,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { body0(); } + catch (InterruptedException | IgniteInterruptedCheckedException e) { + if (!stop) + err = e; + } catch (Throwable e) { err = e; } finally { - if (!stop) + if (err == null && !stop) + err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java index 8f3d738..613e93b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java @@ -20,11 +20,15 @@ package org.apache.ignite.internal.processors.cache; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.thread.IgniteThread; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; + /** * Periodically removes expired entities from caches with {@link CacheConfiguration#isEagerTtl()} flag set. */ @@ -122,19 +126,38 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - boolean expiredRemains = false; + Throwable err = null; + + try { + while (!isCancelled()) { + boolean expiredRemains = false; + + for (GridCacheTtlManager mgr : mgrs) { + if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT)) + expiredRemains = true; - for (GridCacheTtlManager mgr : mgrs) { - if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT)) - expiredRemains = true; + if (isCancelled()) + return; + } - if (isCancelled()) - return; + if (!expiredRemains) + U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL); } + } + catch (Throwable t) { + if (!(t instanceof IgniteInterruptedCheckedException)) + err = t; - if (!expiredRemains) - U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL); + throw t; + } + finally { + if (err == null && !isCancelled) + err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 70fc688..caf27b7 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -165,6 +165,8 @@ import static java.nio.file.StandardOpenOption.READ; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID; /** @@ -2787,32 +2789,58 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - waitCheckpointEvent(); + @Override protected void body() { + Throwable err = null; - GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied; + try { + while (!isCancelled()) { + waitCheckpointEvent(); - if (enableChangeApplied != null) { - enableChangeApplied.onDone(); + GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied; - GridCacheDatabaseSharedManager.this.enableChangeApplied = null; - } + if (enableChangeApplied != null) { + enableChangeApplied.onDone(); - if (checkpointsEnabled) - doCheckpoint(); - else { - synchronized (this) { - scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq; + GridCacheDatabaseSharedManager.this.enableChangeApplied = null; + } + + if (checkpointsEnabled) + doCheckpoint(); + else { + synchronized (this) { + scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq; + } } } } + catch (Throwable t) { + err = t; + + scheduledCp.cpFinishFut.onDone(t); + + throw t; + } + finally { + if (err == null && !(stopping && isCancelled)) + err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } // Final run after the cancellation. - if (checkpointsEnabled && !shutdownNow) - doCheckpoint(); + if (checkpointsEnabled && !shutdownNow) { + try { + doCheckpoint(); - scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping.")); + scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping.")); + } + catch (Throwable e) { + scheduledCp.cpFinishFut.onDone(e); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 2fff481..a40811b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -117,6 +117,8 @@ import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; import static org.apache.ignite.configuration.WALMode.LOG_ONLY; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD; import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT; import static org.apache.ignite.internal.util.IgniteUtils.findField; @@ -682,7 +684,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (IgniteCheckedException e) { U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); } } @@ -1234,7 +1236,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (IOException e) { StorageException se = new StorageException("Unable to initialize WAL segment", e); - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, se)); + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); throw se; } @@ -1499,6 +1501,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } + Throwable err = null; + try { synchronized (this) { while (curAbsWalIdx == -1 && !stopped) @@ -1560,6 +1564,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !stopped) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } } /** @@ -1884,8 +1900,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } catch (IgniteCheckedException | IOException e) { U.error(log, "Unexpected error during WAL compression", e); - - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); @@ -2005,6 +2019,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void run() { + Throwable err = null; + while (!Thread.currentThread().isInterrupted() && !stopped) { try { long segmentToDecompress = segmentsQueue.take(); @@ -2034,10 +2050,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } - catch (IOException e) { - U.error(log, "Unexpected error during WAL decompression", e); + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !stopped) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly"); - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); } } } @@ -3146,78 +3169,94 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void run() { - while (!shutdown && !Thread.currentThread().isInterrupted()) { - while (waiters.isEmpty()) { - if (!shutdown) - LockSupport.park(); - else { - unparkWaiters(Long.MAX_VALUE); - - return; - } - } + Throwable err = null; - Long pos = null; + try { + while (!shutdown && !Thread.currentThread().isInterrupted()) { + while (waiters.isEmpty()) { + if (!shutdown) + LockSupport.park(); + else { + unparkWaiters(Long.MAX_VALUE); - for (Long val : waiters.values()) { - if (val > Long.MIN_VALUE) - pos = val; - } + return; + } + } - if (pos == null) - continue; - else if (pos < UNCONDITIONAL_FLUSH) { - try { - assert pos == FILE_CLOSE || pos == FILE_FORCE : pos; + Long pos = null; - if (pos == FILE_CLOSE) - currHnd.fileIO.close(); - else if (pos == FILE_FORCE) - currHnd.fileIO.force(); + for (Long val : waiters.values()) { + if (val > Long.MIN_VALUE) + pos = val; } - catch (IOException e) { - log.error("Exception in WAL writer thread: ", e); - err = e; + if (pos == null) + continue; + else if (pos < UNCONDITIONAL_FLUSH) { + try { + assert pos == FILE_CLOSE || pos == FILE_FORCE : pos; - unparkWaiters(Long.MAX_VALUE); + if (pos == FILE_CLOSE) + currHnd.fileIO.close(); + else if (pos == FILE_FORCE) + currHnd.fileIO.force(); + } + catch (IOException e) { + log.error("Exception in WAL writer thread: ", e); - return; - } + err = e; - unparkWaiters(pos); - } + unparkWaiters(Long.MAX_VALUE); - List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos); + return; + } - if (segs == null) { - unparkWaiters(pos); + unparkWaiters(pos); + } - continue; - } + List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos); - for (int i = 0; i < segs.size(); i++) { - SegmentedRingByteBuffer.ReadSegment seg = segs.get(i); + if (segs == null) { + unparkWaiters(pos); - try { - writeBuffer(seg.position(), seg.buffer()); + continue; } - catch (Throwable e) { - log.error("Exception in WAL writer thread: ", e); - err = e; - } - finally { - seg.release(); + for (int i = 0; i < segs.size(); i++) { + SegmentedRingByteBuffer.ReadSegment seg = segs.get(i); + + try { + writeBuffer(seg.position(), seg.buffer()); + } + catch (Throwable e) { + log.error("Exception in WAL writer thread: ", e); + + err = e; + } + finally { + seg.release(); - long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written; + long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written; - unparkWaiters(p); + unparkWaiters(p); + } } } + + unparkWaiters(Long.MAX_VALUE); } + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !shutdown) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly"); - unparkWaiters(Long.MAX_VALUE); + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } } /** @@ -3283,7 +3322,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Throwable err = walWriter.err; if (err != null) - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err)); + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); if (expPos == UNCONDITIONAL_FLUSH) expPos = (currentHandle().buf.tail()); @@ -3372,7 +3411,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (IOException e) { StorageException se = new StorageException("Unable to write", e); - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, se)); + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); throw se; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index 59196bb..c7d2c11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -110,6 +110,7 @@ import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; /** * File WAL manager. @@ -1338,6 +1339,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } } + Throwable err = null; + try { synchronized (this) { while (curAbsWalIdx == -1 && !stopped) @@ -1399,6 +1402,18 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !stopped) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } } /** @@ -1721,8 +1736,6 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } catch (IgniteCheckedException | IOException e) { U.error(log, "Unexpected error during WAL compression", e); - - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -1814,6 +1827,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** {@inheritDoc} */ @Override public void run() { + Throwable err = null; + while (!Thread.currentThread().isInterrupted() && !stopped) { try { long segmentToDecompress = segmentsQueue.take(); @@ -1840,13 +1855,20 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda decompressionFutures.remove(segmentToDecompress).onDone(); } } - catch (InterruptedException e){ + catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } - catch (IOException e) { - U.error(log, "Unexpected error during WAL decompression", e); + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !stopped) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly"); - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java index ff6beb4..a09d6fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.Comparator; import java.util.Iterator; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; @@ -32,6 +33,9 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; + /** * Detects timeout events and processes them. */ @@ -146,61 +150,81 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - while (!isCancelled()) { - long now = U.currentTimeMillis(); + Throwable err = null; - for (Iterator<GridTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext();) { - GridTimeoutObject timeoutObj = iter.next(); + try { + while (!isCancelled()) { + long now = U.currentTimeMillis(); - if (timeoutObj.endTime() <= now) { - try { - boolean rmvd = timeoutObjs.remove(timeoutObj); + for (Iterator<GridTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { + GridTimeoutObject timeoutObj = iter.next(); - if (log.isDebugEnabled()) - log.debug("Timeout has occurred [obj=" + timeoutObj + ", process=" + rmvd + ']'); + if (timeoutObj.endTime() <= now) { + try { + boolean rmvd = timeoutObjs.remove(timeoutObj); - if (rmvd) - timeoutObj.onTimeout(); - } - catch (Throwable e) { - if (isCancelled() && !(e instanceof Error)){ if (log.isDebugEnabled()) - log.debug("Error when executing timeout callback: " + timeoutObj); + log.debug("Timeout has occurred [obj=" + timeoutObj + ", process=" + rmvd + ']'); - return; + if (rmvd) + timeoutObj.onTimeout(); } + catch (Throwable e) { + if (isCancelled() && !(e instanceof Error)) { + if (log.isDebugEnabled()) + log.debug("Error when executing timeout callback: " + timeoutObj); - U.error(log, "Error when executing timeout callback: " + timeoutObj, e); + return; + } - if (e instanceof Error) - throw e; + U.error(log, "Error when executing timeout callback: " + timeoutObj, e); + + if (e instanceof Error) + throw e; + } } + else + break; } - else - break; - } - - synchronized (mux) { - while (!isCancelled()) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - GridTimeoutObject first = timeoutObjs.firstx(); - - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); - if (waitTime > 0) - mux.wait(waitTime); + synchronized (mux) { + while (!isCancelled()) { + // Access of the first element must be inside of + // synchronization block, so we don't miss out + // on thread notification events sent from + // 'addTimeoutObject(..)' method. + GridTimeoutObject first = timeoutObjs.firstx(); + + if (first != null) { + long waitTime = first.endTime() - U.currentTimeMillis(); + + if (waitTime > 0) + mux.wait(waitTime); + else + break; + } else - break; + mux.wait(5000); } - else - mux.wait(5000); } } } + catch (Throwable t) { + if (!(t instanceof InterruptedException)) + err = t; + + throw t; + } + finally { + if (err == null && !isCancelled) + err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly."); + + if (err instanceof OutOfMemoryError) + ctx.failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } + } } @@ -284,4 +308,4 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { return S.toString(CancelableTask.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 630d34c..c6383ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -64,9 +64,11 @@ public class StripedExecutor implements ExecutorService { * @param igniteInstanceName Node name. * @param poolName Pool name. * @param log Logger. + * @param errHnd Exception handler. */ - public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log) { - this(cnt, igniteInstanceName, poolName, log, false); + public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, + Thread.UncaughtExceptionHandler errHnd) { + this(cnt, igniteInstanceName, poolName, log, errHnd, false); } /** @@ -74,9 +76,11 @@ public class StripedExecutor implements ExecutorService { * @param igniteInstanceName Node name. * @param poolName Pool name. * @param log Logger. + * @param errHnd Exception handler. * @param stealTasks {@code True} to steal tasks. */ - public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) { + public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, + Thread.UncaughtExceptionHandler errHnd, boolean stealTasks) { A.ensure(cnt > 0, "cnt > 0"); boolean success = false; @@ -91,15 +95,9 @@ public class StripedExecutor implements ExecutorService { try { for (int i = 0; i < cnt; i++) { - stripes[i] = stealTasks ? new StripeConcurrentQueue( - igniteInstanceName, - poolName, - i, - log, stripes) : new StripeConcurrentQueue( - igniteInstanceName, - poolName, - i, - log); + stripes[i] = stealTasks + ? new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, stripes, errHnd) + : new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, errHnd); } for (int i = 0; i < cnt; i++) @@ -434,22 +432,28 @@ public class StripedExecutor implements ExecutorService { /** Thread executing the loop. */ protected Thread thread; + /** Exception handler. */ + private Thread.UncaughtExceptionHandler errHnd; + /** * @param igniteInstanceName Ignite instance name. * @param poolName Pool name. * @param idx Stripe index. * @param log Logger. + * @param errHnd Exception handler. */ public Stripe( String igniteInstanceName, String poolName, int idx, - IgniteLogger log + IgniteLogger log, + Thread.UncaughtExceptionHandler errHnd ) { this.igniteInstanceName = igniteInstanceName; this.poolName = poolName; this.idx = idx; this.log = log; + this.errHnd = errHnd; } /** @@ -463,6 +467,8 @@ public class StripedExecutor implements ExecutorService { idx, GridIoPolicy.UNDEFINED); + thread.setUncaughtExceptionHandler(errHnd); + thread.start(); } @@ -518,9 +524,19 @@ public class StripedExecutor implements ExecutorService { return; } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + // Re-throwing to exploit uncaught exception handler. + throw e; + } + U.error(log, "Failed to execute runnable.", e); } } + + if (!stopping) { + throw new IllegalStateException("Thread " + Thread.currentThread().getName() + + " is terminated unexpectedly"); + } } /** @@ -576,14 +592,16 @@ public class StripedExecutor implements ExecutorService { * @param poolName Pool name. * @param idx Stripe index. * @param log Logger. + * @param errHnd Exception handler. */ StripeConcurrentQueue( String igniteInstanceName, String poolName, int idx, - IgniteLogger log + IgniteLogger log, + Thread.UncaughtExceptionHandler errHnd ) { - this(igniteInstanceName, poolName, idx, log, null); + this(igniteInstanceName, poolName, idx, log, null, errHnd); } /** @@ -591,19 +609,22 @@ public class StripedExecutor implements ExecutorService { * @param poolName Pool name. * @param idx Stripe index. * @param log Logger. + * @param errHnd Exception handler. */ StripeConcurrentQueue( String igniteInstanceName, String poolName, int idx, IgniteLogger log, - Stripe[] others + Stripe[] others, + Thread.UncaughtExceptionHandler errHnd ) { super( igniteInstanceName, poolName, idx, - log); + log, + errHnd); this.others = others; @@ -702,17 +723,20 @@ public class StripedExecutor implements ExecutorService { * @param poolName Pool name. * @param idx Stripe index. * @param log Logger. + * @param errHnd Exception handler. */ public StripeConcurrentQueueNoPark( String igniteInstanceName, String poolName, int idx, - IgniteLogger log + IgniteLogger log, + Thread.UncaughtExceptionHandler errHnd ) { super(igniteInstanceName, poolName, idx, - log); + log, + errHnd); } /** {@inheritDoc} */ @@ -758,17 +782,20 @@ public class StripedExecutor implements ExecutorService { * @param poolName Pool name. * @param idx Stripe index. * @param log Logger. + * @param errHnd Exception handler. */ public StripeConcurrentBlockingQueue( String igniteInstanceName, String poolName, int idx, - IgniteLogger log + IgniteLogger log, + Thread.UncaughtExceptionHandler errHnd ) { super(igniteInstanceName, poolName, idx, - log); + log, + errHnd); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 0fcde0e..3597a05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -77,6 +77,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPERATION; @@ -1749,6 +1751,8 @@ public class GridNioServer<T> { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + Throwable err = null; + try { boolean reset = false; @@ -1774,9 +1778,24 @@ public class GridNioServer<T> { catch (Throwable e) { U.error(log, "Caught unhandled exception in NIO worker thread (restart the node).", e); + err = e; + if (e instanceof Error) throw e; } + finally { + if (err instanceof OutOfMemoryError) + lsnr.onFailure(CRITICAL_ERROR, err); + else if (!closed) { + if (err == null) + lsnr.onFailure(SYSTEM_WORKER_TERMINATION, + new IllegalStateException("Thread " + name() + " is terminated unexpectedly")); + else if (err instanceof InterruptedException) + lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); + } + else if (err != null) + lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); + } } /** @@ -2790,6 +2809,8 @@ public class GridNioServer<T> { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + Throwable err = null; + try { boolean reset = false; @@ -2812,8 +2833,28 @@ public class GridNioServer<T> { } } } + catch (Throwable t) { + if (!(t instanceof IgniteInterruptedCheckedException)) + err = t; + + throw t; + } finally { - closeSelector(); // Safety. + try { + closeSelector(); // Safety. + } + catch (RuntimeException ignore) { + // No-op. + } + + if (err == null && !closed) + err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + lsnr.onFailure(CRITICAL_ERROR, err); + else if (err != null) + lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java index db28792..14c5a74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.failure.FailureType; import org.jetbrains.annotations.Nullable; /** @@ -69,4 +70,9 @@ public interface GridNioServerListener<T> { * @param ses Session that is idle. */ public void onSessionIdleTimeout(GridNioSession ses); + + /** + * Called when critical failure occurs in server implementation. + */ + public void onFailure(FailureType failureType, Throwable failure); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java index 5d222c1..b6b20b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.failure.FailureType; + /** * Server listener adapter providing empty methods implementation for rarely used methods. */ @@ -35,4 +37,8 @@ public abstract class GridNioServerListenerAdapter<T> implements GridNioServerLi @Override public void onMessageSent(GridNioSession ses, T msg) { // No-op. } + + @Override public void onFailure(FailureType failureType, Throwable failure) { + // No-op. + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4a0710e..9e7b592 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -62,7 +62,10 @@ import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -151,6 +154,8 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED; @@ -798,6 +803,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } + /** {@inheritDoc} */ + @Override public void onFailure(FailureType failureType, Throwable failure) { + ((IgniteEx)ignite).context().failure().process(new FailureContext(failureType, failure)); + } + /** * @param recovery Recovery descriptor. * @param ses Session. @@ -4190,13 +4200,32 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (log.isDebugEnabled()) log.debug("Tcp communication worker has been started."); - while (!isInterrupted()) { - DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + Throwable err = null; - if (disconnectData != null) - processDisconnect(disconnectData); - else - processIdle(); + try { + while (!isInterrupted()) { + DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + + if (disconnectData != null) + processDisconnect(disconnectData); + else + processIdle(); + } + } + catch (Throwable t) { + if (!(t instanceof InterruptedException)) + err = t; + + throw t; + } + finally { + if (err == null && !stopping) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly."); + + if (err instanceof OutOfMemoryError) + ((IgniteEx)ignite).context().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + ((IgniteEx)ignite).context().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 4aa1316..7bf37e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -50,6 +50,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; @@ -66,6 +67,8 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; @@ -73,6 +76,7 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; +import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.security.SecurityUtils; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; @@ -137,7 +141,6 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustom import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; -import java.util.concurrent.ConcurrentHashMap; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE; @@ -149,6 +152,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER; @@ -2609,12 +2614,20 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { + Throwable err = null; + try { super.body(); } + catch (InterruptedException e) { + if (!spi.isNodeStopping0()) + err = e; + + throw e; + } catch (Throwable e) { if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) { - final Ignite ignite = spi.ignite(); + final Ignite ignite = spi.ignite(); if (ignite != null) { U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " + @@ -2637,9 +2650,22 @@ class ServerImpl extends TcpDiscoveryImpl { } } + err = e; + // Must be processed by IgniteSpiThread as well. throw e; } + finally { + if (err == null && !spi.isNodeStopping0()) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly."); + + FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure(); + + if (err instanceof OutOfMemoryError) + failure.process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + failure.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } } /** @@ -5597,7 +5623,9 @@ class ServerImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { + @Override protected void body() { + Throwable err = null; + try { while (!isInterrupted()) { Socket sock = srvrSock.accept(); @@ -5630,13 +5658,30 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to accept TCP connection.", e); if (!isInterrupted()) { + err = e; + if (U.isMacInvalidArgumentError(e)) U.error(log, "Failed to accept TCP connection\n\t" + U.MAC_INVALID_ARG_MSG, e); else U.error(log, "Failed to accept TCP connection.", e); } } + catch (Throwable t) { + err = t; + + throw t; + } finally { + if (err == null && !spi.isNodeStopping0()) + err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly."); + + FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure(); + + if (err instanceof OutOfMemoryError) + failure.process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + failure.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + U.closeQuiet(srvrSock); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java index 543907f..3fca7af 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java @@ -29,7 +29,7 @@ public class StripedExecutorTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public void beforeTest() { - stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new JavaLogger()); + stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new JavaLogger(), (thread, t) -> {}); } /** {@inheritDoc} */
