Repository: asterixdb Updated Branches: refs/heads/master d8d4eefbe -> f2604d89c
[ASTERIXDB-2230][TX] Survive Interrupt in Log File Switch - user model changes: no - storage format changes: no - interface changes: yes - ILogBuffer: (-) setLastPage - ILogManager: (-) renewLogFilesAndStartFromLSN Details: - Survive interrupt in log file switch. - Make LogManager responsible of closing log files. - Remove unneeded methods in ILogManager and ILogBuffer. - Adapt log file switch test case to new behavior. Change-Id: I191564c510c0555f191a35e2603e051bbef24540 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2301 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f2604d89 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f2604d89 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f2604d89 Branch: refs/heads/master Commit: f2604d89c42f8b6659288df195f9c082f417e912 Parents: d8d4eef Author: Murtadha Hubail <[email protected]> Authored: Fri Jan 19 18:47:09 2018 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Fri Jan 19 10:12:00 2018 -0800 ---------------------------------------------------------------------- .../apache/asterix/test/txn/LogManagerTest.java | 47 ++++++------------ .../asterix/common/transactions/ILogBuffer.java | 5 -- .../common/transactions/ILogManager.java | 8 --- .../apache/asterix/common/utils/InvokeUtil.java | 33 +++++++++++++ .../management/service/logging/LogBuffer.java | 21 ++------ .../management/service/logging/LogManager.java | 52 ++++++++++---------- 6 files changed, 78 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java index b14d70b..a1978eb 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java @@ -19,8 +19,6 @@ package org.apache.asterix.test.txn; import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -46,7 +44,6 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.test.common.TestTupleReference; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants; -import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.junit.After; @@ -59,6 +56,7 @@ public class LogManagerTest { protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); private static final String PREPARE_NEXT_LOG_FILE_METHOD = "prepareNextLogFile"; + private static final String ENSURE_LAST_PAGE_FLUSHED_METHOD = "ensureLastPageFlushed"; @Before public void setUp() throws Exception { @@ -146,39 +144,20 @@ public class LogManagerTest { int logFileCountBeforeInterrupt = logManager.getLogFileIds().size(); // ensure an interrupted transactor will create next log file but will fail to position the log channel - final AtomicBoolean interrupted = new AtomicBoolean(false); + final AtomicBoolean failed = new AtomicBoolean(false); Thread interruptedTransactor = new Thread(() -> { Thread.currentThread().interrupt(); try { prepareNextLogFile(logManager); } catch (Exception e) { - Throwable rootCause = ExceptionUtils.getRootCause(e); - if (rootCause.getCause() instanceof java.nio.channels.ClosedByInterruptException) { - interrupted.set(true); - } + failed.set(true); } }); interruptedTransactor.start(); interruptedTransactor.join(); - // ensure a new log file was created but the thread was interrupt + // ensure a new log file was created and survived interrupt int logFileCountAfterInterrupt = logManager.getLogFileIds().size(); Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt); - Assert.assertTrue(interrupted.get()); - - // ensure next transactor will not create another file - final AtomicBoolean failed = new AtomicBoolean(false); - Thread transactor = new Thread(() -> { - try { - prepareNextLogFile(logManager); - } catch (Exception e) { - failed.set(true); - } - }); - transactor.start(); - transactor.join(); - // make sure no new files were created and the operation was successful - int countAfterTransactor = logManager.getLogFileIds().size(); - Assert.assertEquals(logFileCountAfterInterrupt, countAfterTransactor); Assert.assertFalse(failed.get()); // make sure we can still log to the new file @@ -196,14 +175,20 @@ public class LogManagerTest { } private static void prepareNextLogFile(LogManager logManager) throws Exception { - Method method; + Method ensureLastPageFlushed; + Method prepareNextLogFile; + String targetMethod = null; try { - method = LogManager.class.getDeclaredMethod(PREPARE_NEXT_LOG_FILE_METHOD, null); + targetMethod = ENSURE_LAST_PAGE_FLUSHED_METHOD; + ensureLastPageFlushed = LogManager.class.getDeclaredMethod(targetMethod, null); + targetMethod = PREPARE_NEXT_LOG_FILE_METHOD; + prepareNextLogFile = LogManager.class.getDeclaredMethod(targetMethod, null); } catch (Exception e) { - throw new IllegalStateException( - "Couldn't find " + PREPARE_NEXT_LOG_FILE_METHOD + " in LogManager. Was it renamed?"); + throw new IllegalStateException("Couldn't find " + targetMethod + " in LogManager. Was it renamed?"); } - method.setAccessible(true); - method.invoke(logManager, null); + ensureLastPageFlushed.setAccessible(true); + ensureLastPageFlushed.invoke(logManager, null); + prepareNextLogFile.setAccessible(true); + prepareNextLogFile.invoke(logManager, null); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java index 6bdce73..b4a7d38 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java @@ -62,11 +62,6 @@ public interface ILogBuffer { void reset(); /** - * Set current page to be the last page of the associated file - */ - void setLastPage(); - - /** * stops the log buffer */ void stop(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java index aa018ba..d7e0885 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java @@ -63,14 +63,6 @@ public interface ILogManager { public String getNodeId(); /** - * Delete all log files and start new log partition > LSNtoStartFrom - * - * @param LSNtoStartFrom - * @throws IOException - */ - public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException; - - /** * @return the log page size in bytes */ public int getLogPageSize(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java index c718fca..9bdf55c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.utils; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -30,6 +31,9 @@ public class InvokeUtil { private static final Logger LOGGER = LogManager.getLogger(); + private InvokeUtil() { + } + /** * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible * completes, the current thread will be re-interrupted, if the original operation was interrupted. @@ -144,6 +148,31 @@ public class InvokeUtil { return false; } + /** + * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException} or + * {@link InterruptedException}. Once the interruptible completes, the current thread will be re-interrupted, if + * the original operation was interrupted. + */ + public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws IOException { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (ClosedByInterruptException | InterruptedException e) { + LOGGER.error("IO operation Interrupted. Retrying..", e); + interrupted = true; + Thread.interrupted(); + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + @FunctionalInterface public interface Interruptible { void run() throws InterruptedException; @@ -154,4 +183,8 @@ public class InvokeUtil { void run() throws Exception; // NOSONAR } + @FunctionalInterface + public interface ThrowingIOInterruptible { + void run() throws IOException, InterruptedException; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 011d2a1..614591b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.transaction.management.service.logging; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.LinkedBlockingQueue; @@ -56,7 +55,6 @@ public class LogBuffer implements ILogBuffer { protected final ByteBuffer appendBuffer; private final ByteBuffer flushBuffer; private final ByteBuffer unlockBuffer; - private boolean isLastPage; protected final LinkedBlockingQueue<ILogRecord> syncCommitQ; protected final LinkedBlockingQueue<ILogRecord> flushQ; protected final LinkedBlockingQueue<ILogRecord> remoteJobsQ; @@ -76,7 +74,6 @@ public class LogBuffer implements ILogBuffer { full = new AtomicBoolean(false); appendOffset = 0; flushOffset = 0; - isLastPage = false; syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE); flushQ = new LinkedBlockingQueue<>(); remoteJobsQ = new LinkedBlockingQueue<>(); @@ -132,11 +129,6 @@ public class LogBuffer implements ILogBuffer { } @Override - public void setLastPage() { - this.isLastPage = true; - } - - @Override public boolean hasSpace(int logSize) { return appendOffset + logSize <= logPageSize && !full.get(); } @@ -152,7 +144,6 @@ public class LogBuffer implements ILogBuffer { full.set(false); appendOffset = 0; flushOffset = 0; - isLastPage = false; stop = false; } @@ -174,24 +165,18 @@ public class LogBuffer implements ILogBuffer { + ", full: " + full.get()); } if (stopping || stop) { - fileChannel.close(); return; } wait(); } endOffset = appendOffset; } - internalFlush(flushOffset, endOffset); + internalFlush(flushOffset, endOffset); } catch (InterruptedException e) { interrupted = true; } } internalFlush(flushOffset, appendOffset); - if (isLastPage) { - fileChannel.close(); - } - } catch (IOException e) { - throw new IllegalStateException(e); } finally { if (interrupted) { Thread.currentThread().interrupt(); @@ -235,8 +220,8 @@ public class LogBuffer implements ILogBuffer { reusableTxnId.setId(logRecord.getTxnId()); reusableDatasetId.setId(logRecord.getDatasetId()); txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId); - txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(), - LockMode.ANY, txnCtx); + txnSubsystem.getLockManager() + .unlock(reusableDatasetId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx); txnCtx.notifyEntityCommitted(); if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) { txnSubsystem.incrementEntityCommitCount(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f2604d89/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 96d0539..c226886 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -93,6 +93,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { private LogFlusher logFlusher; private Future<? extends Object> futureLogFlusher; protected LinkedBlockingQueue<ILogRecord> flushLogsQ; + private long currentLogFileId; public LogManager(ITransactionSubsystem txnSubsystem) { this.txnSubsystem = txnSubsystem; @@ -239,17 +240,18 @@ public class LogManager implements ILogManager, ILifeCycleComponent { private void prepareNextLogFile() { final long nextFileBeginLsn = getNextFileFirstLsn(); try { + closeCurrentLogFile(); createNextLogFile(); - setLogPosition(nextFileBeginLsn); + InvokeUtil.doIoUninterruptibly(() -> setLogPosition(nextFileBeginLsn)); + // move appendLSN and flushLSN to the first LSN of the next log file + // only after the file was created and the channel was positioned successfully + appendLSN.set(nextFileBeginLsn); + flushLSN.set(nextFileBeginLsn); + LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", currentLogFileId, + nextFileBeginLsn); } catch (IOException e) { throw new ACIDException(e); } - // move appendLSN and flushLSN to the first LSN of the next log file - // only after the file was created and the channel was positioned successfully - appendLSN.set(nextFileBeginLsn); - flushLSN.set(nextFileBeginLsn); - LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", getLogFileId(nextFileBeginLsn), - nextFileBeginLsn); } private long getNextFileFirstLsn() { @@ -258,8 +260,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent { } private void ensureLastPageFlushed() { - // Mark the page as the last page so that it will close the output file channel. - appendPage.setLastPage(); // Make sure to flush whatever left in the log tail. appendPage.setFull(); synchronized (flushLSN) { @@ -301,6 +301,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { @Override public void stop(boolean dumpState, OutputStream os) { terminateLogFlusher(); + closeCurrentLogFile(); if (dumpState) { dumpState(os); } @@ -387,6 +388,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { @Override public void renewLogFiles() { terminateLogFlusher(); + closeCurrentLogFile(); long lastMaxLogFileId = deleteAllLogFiles(); initializeLogManager(lastMaxLogFileId + 1); } @@ -445,13 +447,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent { } private long deleteAllLogFiles() { - if (appendChannel != null) { - try { - appendChannel.close(); - } catch (IOException e) { - throw new IllegalStateException("Failed to close a fileChannel of a log file"); - } - } txnLogFileId2ReaderCount.clear(); List<Long> logFileIds = getLogFileIds(); if (logFileIds != null) { @@ -537,9 +532,22 @@ public class LogManager implements ILogManager, ILifeCycleComponent { final long fileId = getLogFileId(lsn); final Path targetFilePath = Paths.get(getLogFilePath(fileId)); final long targetPosition = getLogFileOffset(lsn); - final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw"); // NOSONAR closed by LogBuffer + final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw"); // NOSONAR closed when full appendChannel = raf.getChannel(); appendChannel.position(targetPosition); + currentLogFileId = fileId; + } + + private void closeCurrentLogFile() { + if (appendChannel != null && appendChannel.isOpen()) { + try { + LOGGER.info("closing current log file with id({})", currentLogFileId); + appendChannel.close(); + } catch (IOException e) { + LOGGER.error(() -> "failed to close log file with id(" + currentLogFileId + ")", e); + throw new ACIDException(e); + } + } } @Override @@ -563,14 +571,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent { } @Override - public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException { - terminateLogFlusher(); - deleteAllLogFiles(); - long newLogFile = getLogFileId(LSNtoStartFrom); - initializeLogManager(newLogFile + 1); - } - - @Override public void setReplicationManager(IReplicationManager replicationManager) { throw new IllegalStateException("This log manager does not support replication"); } @@ -687,7 +687,7 @@ class LogFlusher implements Callable<Boolean> { } @Override - public Boolean call() throws InterruptedException { + public Boolean call() { started.release(); boolean interrupted = false; try {
