Repository: ignite Updated Branches: refs/heads/master 629327067 -> f3a61e4a4
IGNITE-8429 Unexpected error during incorrect WAL segment decompression, causes node termination Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f3a61e4a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f3a61e4a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f3a61e4a Branch: refs/heads/master Commit: f3a61e4a4753b31ecdcef0864e8c095214b6a4ae Parents: 6293270 Author: Ivan Rakov <[email protected]> Authored: Tue May 8 15:03:00 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Tue May 8 15:03:00 2018 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionSupplier.java | 4 +-- .../wal/FileWriteAheadLogManager.java | 32 ++++++++++++-------- .../wal/FsyncModeFileWriteAheadLogManager.java | 32 ++++++++++++-------- 3 files changed, 40 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f3a61e4a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 84e6828..4946d7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -32,10 +32,10 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.S; http://git-wip-us.apache.org/repos/asf/ignite/blob/f3a61e4a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 7795344..6ac102f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -33,6 +33,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.MappedByteBuffer; import java.nio.channels.ClosedByInterruptException; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.sql.Time; import java.util.ArrayList; @@ -2065,11 +2066,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void run() { - Throwable err = null; - while (!Thread.currentThread().isInterrupted() && !stopped) { + long segmentToDecompress = -1L; + try { - long segmentToDecompress = segmentsQueue.take(); + segmentToDecompress = segmentsQueue.take(); if (stopped) break; @@ -2087,7 +2088,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl io.write(arr, 0, bytesRead); } - Files.move(unzipTmp.toPath(), unzip.toPath()); + try { + Files.move(unzipTmp.toPath(), unzip.toPath()); + } + catch (FileAlreadyExistsException e) { + U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + + "[tmp=" + unzipTmp + ", raw=" + unzip + "]", e); + + if (!unzipTmp.delete()) + U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + "]"); + } synchronized (this) { decompressionFutures.remove(segmentToDecompress).onDone(); @@ -2097,16 +2107,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Thread.currentThread().interrupt(); } catch (Throwable t) { - err = t; - } - finally { - if (err == null && !stopped) - err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly"); + if (!stopped && segmentToDecompress != -1L) { + IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + + "decompression [segmentIdx=" + segmentToDecompress + "]", t); - if (err instanceof OutOfMemoryError) - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); - else if (err != null) - cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + decompressionFutures.remove(segmentToDecompress).onDone(e); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f3a61e4a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index dfb1c41..cf643fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -28,6 +28,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.sql.Time; import java.util.ArrayList; @@ -1873,11 +1874,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** {@inheritDoc} */ @Override public void run() { - Throwable err = null; - while (!Thread.currentThread().isInterrupted() && !stopped) { + long segmentToDecompress = -1L; + try { - long segmentToDecompress = segmentsQueue.take(); + segmentToDecompress = segmentsQueue.take(); if (stopped) break; @@ -1895,7 +1896,16 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda io.write(arr, 0, bytesRead); } - Files.move(unzipTmp.toPath(), unzip.toPath()); + try { + Files.move(unzipTmp.toPath(), unzip.toPath()); + } + catch (FileAlreadyExistsException e) { + U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + + "[tmp=" + unzipTmp + ", raw=" + unzip + ']', e); + + if (!unzipTmp.delete()) + U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + ']'); + } synchronized (this) { decompressionFutures.remove(segmentToDecompress).onDone(); @@ -1905,16 +1915,12 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda Thread.currentThread().interrupt(); } catch (Throwable t) { - err = t; - } - finally { - if (err == null && !stopped) - err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly"); + if (!stopped && segmentToDecompress != -1L) { + IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + + "decompression [segmentIdx=" + segmentToDecompress + ']', t); - if (err instanceof OutOfMemoryError) - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); - else if (err != null) - cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + decompressionFutures.remove(segmentToDecompress).onDone(e); + } } } }
