Repository: ignite Updated Branches: refs/heads/master 324e61056 -> b8a421ee3
IGNITE-8938 Failure handling for file-decompressor thread added Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8a421ee Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8a421ee Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8a421ee Branch: refs/heads/master Commit: b8a421ee38d08c47aea7bdbf37a753b87eef5a58 Parents: 324e610 Author: Andrey Gura <[email protected]> Authored: Thu Jul 5 19:40:26 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Fri Jul 13 19:08:35 2018 +0300 ---------------------------------------------------------------------- .../wal/FileWriteAheadLogManager.java | 114 +++++++++------- .../wal/FsyncModeFileWriteAheadLogManager.java | 130 +++++++++++------- .../worker/WorkersControlMXBeanImpl.java | 7 +- .../failure/SystemWorkersTerminationTest.java | 132 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 5 files changed, 291 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b8a421ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 3a8cd15..2988208 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -444,10 +444,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (dsCfg.isWalCompactionEnabled()) { compressor = new FileCompressor(); - decompressor = new FileDecompressor(log); + if (decompressor == null) { // Preventing of two file-decompressor thread instantiations. + decompressor = new FileDecompressor(log); - if (!cctx.kernalContext().clientNode()) new IgniteThread(decompressor).start(); + } } walDisableContext = cctx.walState().walDisableContext(); @@ -1667,8 +1668,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } } - catch (InterruptedException ignore) { + catch (InterruptedException t) { Thread.currentThread().interrupt(); + + if (!stopped) + err = t; } catch (Throwable t) { err = t; @@ -1678,9 +1682,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly"); if (err instanceof OutOfMemoryError) - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + failureProcessor.process(new FailureContext(CRITICAL_ERROR, err)); else if (err != null) - cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + failureProcessor.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); } } @@ -2141,61 +2145,81 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param log Logger. */ FileDecompressor(IgniteLogger log) { - super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log); + super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log, + cctx.kernalContext().workersRegistry()); } /** {@inheritDoc} */ @Override protected void body() { - while (!isCancelled()) { - long segmentToDecompress = -1L; + Throwable err = null; - try { - segmentToDecompress = segmentsQueue.take(); + try { + while (!isCancelled()) { + long segmentToDecompress = -1L; - if (isCancelled()) - break; + try { + segmentToDecompress = segmentsQueue.take(); - File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip"); - File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp"); - File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); + if (isCancelled()) + break; - try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); - FileIO io = ioFactory.create(unzipTmp)) { - zis.getNextEntry(); + File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip"); + File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp"); + File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); - while (io.writeFully(arr, 0, zis.read(arr)) > 0) - ; - } + try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); + FileIO io = ioFactory.create(unzipTmp)) { + zis.getNextEntry(); - try { - Files.move(unzipTmp.toPath(), unzip.toPath()); - } - catch (FileAlreadyExistsException e) { - U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + - "[tmp=" + unzipTmp + ", raw=" + unzip + "]", e); + while (io.writeFully(arr, 0, zis.read(arr)) > 0) + ; + } - if (!unzipTmp.delete()) - U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + "]"); - } + try { + Files.move(unzipTmp.toPath(), unzip.toPath()); + } + catch (FileAlreadyExistsException e) { + U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + + "[tmp=" + unzipTmp + ", raw=" + unzip + "]", e); - synchronized (this) { - decompressionFutures.remove(segmentToDecompress).onDone(); - } - } - catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - } - catch (Throwable t) { - if (!isCancelled && segmentToDecompress != -1L) { - IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + - "decompression [segmentIdx=" + segmentToDecompress + "]", t); + if (!unzipTmp.delete()) + U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + "]"); + } synchronized (this) { - decompressionFutures.remove(segmentToDecompress).onDone(e); + decompressionFutures.remove(segmentToDecompress).onDone(); + } + } + catch (IOException ex) { + if (!isCancelled && segmentToDecompress != -1L) { + IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + + "decompression [segmentIdx=" + segmentToDecompress + "]", ex); + + synchronized (this) { + decompressionFutures.remove(segmentToDecompress).onDone(e); + } } } } } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + if (!isCancelled) + err = e; + } + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !isCancelled) + err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + failureProcessor.process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + failureProcessor.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } } /** @@ -2221,10 +2245,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return res; } - /** - * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. - */ - private void shutdown() throws IgniteInterruptedCheckedException { + /** */ + private void shutdown() { synchronized (this) { U.cancel(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/b8a421ee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index b11b829..2f64bd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -383,7 +383,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda if (dsCfg.isWalCompactionEnabled()) { compressor = new FileCompressor(); - decompressor = new FileDecompressor(log); + if (decompressor == null) { // Preventing of two file-decompressor thread instantiations. + decompressor = new FileDecompressor(log); + + new IgniteThread(decompressor).start(); + } } walDisableContext = cctx.walState().walDisableContext(); @@ -457,15 +461,14 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda start0(); if (!cctx.kernalContext().clientNode()) { - assert archiver != null; + if (isArchiverEnabled()) { + assert archiver != null; - new IgniteThread(archiver).start(); + new IgniteThread(archiver).start(); + } if (compressor != null) compressor.start(); - - if (decompressor != null) - new IgniteThread(decompressor).start(); } } @@ -555,6 +558,18 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** + * Archiver can be not created, all files will be written to WAL folder, using absolute segment index. + * + * @return flag indicating if archiver is disabled. + */ + private boolean isArchiverEnabled() { + if (walArchiveDir != null && walWorkDir != null) + return !walArchiveDir.equals(walWorkDir); + + return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath())); + } + + /** * Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer. * * @param low Low bound. @@ -1499,8 +1514,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } } } - catch (InterruptedException ignore) { + catch (InterruptedException t) { Thread.currentThread().interrupt(); + + if (!stopped) + err = t; } catch (Throwable t) { err = t; @@ -1950,61 +1968,81 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * @param log Logger. */ FileDecompressor(IgniteLogger log) { - super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log); + super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log, + cctx.kernalContext().workersRegistry()); } /** {@inheritDoc} */ @Override protected void body() { - while (!isCancelled()) { - long segmentToDecompress = -1L; + Throwable err = null; - try { - segmentToDecompress = segmentsQueue.take(); + try { + while (!isCancelled()) { + long segmentToDecompress = -1L; - if (isCancelled()) - break; + try { + segmentToDecompress = segmentsQueue.take(); - File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip"); - File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp"); - File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); + if (isCancelled()) + break; - try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); - FileIO io = ioFactory.create(unzipTmp)) { - zis.getNextEntry(); + File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip"); + File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp"); + File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); - while (io.writeFully(arr, 0, zis.read(arr)) > 0) - ; - } + try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); + FileIO io = ioFactory.create(unzipTmp)) { + zis.getNextEntry(); - try { - Files.move(unzipTmp.toPath(), unzip.toPath()); - } - catch (FileAlreadyExistsException e) { - U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + - "[tmp=" + unzipTmp + ", raw=" + unzip + ']', e); + while (io.writeFully(arr, 0, zis.read(arr)) > 0) + ; + } - if (!unzipTmp.delete()) - U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + ']'); - } + try { + Files.move(unzipTmp.toPath(), unzip.toPath()); + } + catch (FileAlreadyExistsException e) { + U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + + "[tmp=" + unzipTmp + ", raw=" + unzip + ']', e); - synchronized (this) { - decompressionFutures.remove(segmentToDecompress).onDone(); - } - } - catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - } - catch (Throwable t) { - if (!isCancelled && segmentToDecompress != -1L) { - IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + - "decompression [segmentIdx=" + segmentToDecompress + ']', t); + if (!unzipTmp.delete()) + U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + ']'); + } synchronized (this) { - decompressionFutures.remove(segmentToDecompress).onDone(e); + decompressionFutures.remove(segmentToDecompress).onDone(); + } + } + catch (IOException ex) { + if (!isCancelled && segmentToDecompress != -1L) { + IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + + "decompression [segmentIdx=" + segmentToDecompress + ']', ex); + + synchronized (this) { + decompressionFutures.remove(segmentToDecompress).onDone(e); + } } } } } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + if (!isCancelled) + err = e; + } + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !isCancelled) + err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } } /** @@ -2030,9 +2068,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda return res; } - /** - * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. - */ + /** */ private void shutdown() { synchronized (this) { U.cancel(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/b8a421ee/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java index 65f872c..1f082b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersControlMXBeanImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.worker; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import org.apache.ignite.internal.util.worker.GridWorker; @@ -41,7 +42,11 @@ public class WorkersControlMXBeanImpl implements WorkersControlMXBean { /** {@inheritDoc} */ @Override public List<String> getWorkerNames() { - return new ArrayList<>(workerRegistry.names()); + List<String> names = new ArrayList<>(workerRegistry.names()); + + Collections.sort(names); + + return names; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b8a421ee/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java new file mode 100644 index 0000000..0df870d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.failure; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.worker.WorkersRegistry; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests system critical workers termination. + */ +public class SystemWorkersTerminationTest extends GridCommonAbstractTest { + /** Handler latch. */ + private static volatile CountDownLatch hndLatch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setFailureHandler(new TestFailureHandler()); + + DataRegionConfiguration drCfg = new DataRegionConfiguration(); + drCfg.setPersistenceEnabled(true); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration(); + dsCfg.setDefaultDataRegionConfiguration(drCfg); + dsCfg.setWalCompactionEnabled(true); + + cfg.setDataStorageConfiguration(dsCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteWorkFiles(); + + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + deleteWorkFiles(); + } + + /** + * @throws Exception If failed. + */ + public void testTermination() throws Exception { + Ignite ignite = ignite(0); + + ignite.cluster().active(true); + + WorkersRegistry registry = ((IgniteKernal)ignite).context().workersRegistry(); + + Collection<String> threadNames = new ArrayList<>(registry.names()); + + int cnt = 0; + + for (String threadName : threadNames) { + log.info("Worker termination: " + threadName); + + hndLatch = new CountDownLatch(1); + + GridWorker w = registry.worker(threadName); + + Thread t = w.runner(); + + t.interrupt(); + + assertTrue(hndLatch.await(3, TimeUnit.SECONDS)); + + log.info("Worker is terminated: " + threadName); + + cnt++; + } + + assertEquals(threadNames.size(), cnt); + } + + /** + * @throws Exception If failed. + */ + private void deleteWorkFiles() throws Exception { + cleanPersistenceDir(); + + U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false)); + } + + /** + * Test failure handler. + */ + private class TestFailureHandler implements FailureHandler { + /** {@inheritDoc} */ + @Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) { + hndLatch.countDown(); + + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b8a421ee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index a107ac9..c75ab7d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.failure.IoomFailureHandlerTest; import org.apache.ignite.failure.OomFailureHandlerTest; import org.apache.ignite.failure.StopNodeFailureHandlerTest; import org.apache.ignite.failure.StopNodeOrHaltFailureHandlerTest; +import org.apache.ignite.failure.SystemWorkersTerminationTest; import org.apache.ignite.internal.ClassSetTest; import org.apache.ignite.internal.ClusterGroupHostsSelfTest; import org.apache.ignite.internal.ClusterGroupSelfTest; @@ -211,6 +212,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(IoomFailureHandlerTest.class); suite.addTestSuite(OomFailureHandlerTest.class); suite.addTestSuite(AccountTransferTransactionTest.class); + suite.addTestSuite(SystemWorkersTerminationTest.class); suite.addTestSuite(AtomicOperationsInTxTest.class);
