Repository: asterixdb Updated Branches: refs/heads/master f9fe6dda8 -> 76ecc4b19
[NO ISSUE][TX] Make TxnLogFile Close Idempotent - user model changes: no - storage format changes: no - interface changes: yes Renamed ILogReader.initializeScan to setPosition and added javadocs. Details: Currently there is an explicit check that the file channel of a TxnLogFile is open before closing it. However, the channel could be closed due to interrupts and therefore we should remove the explicit check and always try to close it. However, we should always decrement the TxnLogFile references counter even if the channel is not open since that TxnLogFile is not accessed anymore. Change-Id: I255e4b9af0bc78298c0a25daf0b5629d413eba6a Reviewed-on: https://asterix-gerrit.ics.uci.edu/2165 Reviewed-by: Till Westmann <[email protected]> Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/76ecc4b1 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/76ecc4b1 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/76ecc4b1 Branch: refs/heads/master Commit: 76ecc4b191a5877f1543c711cbc8869657ce55e9 Parents: f9fe6dd Author: Murtadha Hubail <[email protected]> Authored: Tue Nov 21 01:20:30 2017 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Mon Nov 20 17:17:40 2017 -0800 ---------------------------------------------------------------------- .../apache/asterix/app/nc/RecoveryManager.java | 6 +-- .../asterix/common/transactions/ILogReader.java | 34 +++++++++---- .../management/service/logging/LogManager.java | 2 +- .../management/service/logging/LogReader.java | 52 ++++++++------------ 4 files changed, 49 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 7bc5697..19966fe 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -201,7 +201,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { jobId2WinnerEntitiesMap = new HashMap<>(); //set log reader to the lowWaterMarkLsn ILogRecord logRecord; - logReader.initializeScan(lowWaterMarkLSN); + logReader.setPosition(lowWaterMarkLSN); logRecord = logReader.next(); while (logRecord != null) { if (IS_DEBUG_MODE) { @@ -300,7 +300,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { ILogRecord logRecord = null; try { - logReader.initializeScan(lowWaterMarkLSN); + logReader.setPosition(lowWaterMarkLSN); logRecord = logReader.next(); while (logRecord != null) { if (IS_DEBUG_MODE) { @@ -540,7 +540,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { Set<Integer> activePartitions = localResourceRepository.getActivePartitions(); ILogReader logReader = logMgr.getLogReader(false); try { - logReader.initializeScan(firstLSN); + logReader.setPosition(firstLSN); ILogRecord logRecord = null; while (currentLSN < lastLSN) { logRecord = logReader.next(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java index da188e3..8539e2b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java @@ -18,18 +18,34 @@ */ package org.apache.asterix.common.transactions; -import org.apache.asterix.common.exceptions.ACIDException; - public interface ILogReader { - public void initializeScan(long beginLSN) throws ACIDException; + /** + * Sets the log reader position at log sequence number with value {@code lsn}. + * + * @param lsn + */ + void setPosition(long lsn); - //for scanning - public ILogRecord next() throws ACIDException; + /** + * Reads and returns the log record located at the log reader current position. After reading the log record, + * the log reader position is incremented by the size of the read log. + * + * @return the log record + */ + ILogRecord next(); - //for random reading - public ILogRecord read(long readLSN) throws ACIDException; + /** + * Reads and returns the log record with log sequence number {@code lsn}. + * + * @param lsn + * @return The log record + */ + ILogRecord read(long lsn); - public void close() throws ACIDException; + /** + * Closes the log reader and any resources used. + */ + void close(); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index dd0a5c7..cdd957a 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -580,7 +580,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { @Override public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException { if (!fileChannel.isOpen()) { - throw new IllegalStateException("File channel is not open"); + LOGGER.warning(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel."); } fileChannel.close(); untouchLogFile(logFileRef.getLogFileId()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java index 148aa7e..f2c5eef 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java @@ -30,14 +30,11 @@ import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.MutableLong; import org.apache.asterix.common.transactions.TxnLogFile; +import org.apache.hyracks.util.annotations.NotThreadSafe; -/** - * NOTE: Many method calls of this class are not thread safe. - * Be very cautious using it in a multithreaded context. - */ +@NotThreadSafe public class LogReader implements ILogReader { - public static final boolean IS_DEBUG_MODE = false;//true private static final Logger LOGGER = Logger.getLogger(LogReader.class.getName()); private final ILogManager logMgr; private final long logFileSize; @@ -54,7 +51,7 @@ public class LogReader implements ILogReader { private enum ReturnState { FLUSH, EOF - }; + } public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN, boolean isRecoveryMode) { @@ -68,8 +65,8 @@ public class LogReader implements ILogReader { } @Override - public void initializeScan(long beginLSN) throws ACIDException { - readLSN = beginLSN; + public void setPosition(long lsn) { + readLSN = lsn; if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) { return; } @@ -84,7 +81,7 @@ public class LogReader implements ILogReader { * @throws ACIDException */ @Override - public ILogRecord next() throws ACIDException { + public ILogRecord next() { if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) { return null; } @@ -147,13 +144,10 @@ public class LogReader implements ILogReader { return ReturnState.EOF; } try { - if (IS_DEBUG_MODE) { - LOGGER.info( - "waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN); - } flushLSN.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); + throw new ACIDException(e); } } return ReturnState.FLUSH; @@ -166,10 +160,9 @@ public class LogReader implements ILogReader { * @return true if log continues, false if EOF * @throws ACIDException */ - private boolean refillLogReadBuffer() throws ACIDException { + private boolean refillLogReadBuffer() { try { if (readLSN % logFileSize == logFile.size()) { - logFile.close(); readLSN += logFileSize - (readLSN % logFileSize); getLogFile(); } @@ -183,14 +176,12 @@ public class LogReader implements ILogReader { * Fills the log buffer with data from the log file at the current position * * @return false if EOF, true otherwise - * @throws ACIDException */ - - private boolean fillLogReadBuffer() throws ACIDException { + private boolean fillLogReadBuffer() { return fillLogReadBuffer(logPageSize, readBuffer); } - private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) throws ACIDException { + private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) { int size = 0; int read = 0; readBuffer.position(0); @@ -217,10 +208,9 @@ public class LogReader implements ILogReader { return true; } - //for random reading @Override - public ILogRecord read(long LSN) throws ACIDException { - readLSN = LSN; + public ILogRecord read(long lsn) { + readLSN = lsn; //wait for the log to be flushed if needed before trying to read it. synchronized (flushLSN) { while (readLSN >= flushLSN.get()) { @@ -232,15 +222,10 @@ public class LogReader implements ILogReader { } } try { - if (logFile == null) { + if (logFile == null || readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) { //get the log file which contains readLSN getLogFile(); fillLogReadBuffer(); - } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) { - //log is not in the current log file - logFile.close(); - getLogFile(); - fillLogReadBuffer(); } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) { //log is not in the current read buffer fillLogReadBuffer(); @@ -265,7 +250,7 @@ public class LogReader implements ILogReader { case TRUNCATED: { if (!fillLogReadBuffer()) { throw new IllegalStateException( - "Could not read LSN(" + LSN + ") from log file id " + logFile.getLogFileId()); + "Could not read LSN(" + lsn + ") from log file id " + logFile.getLogFileId()); } //now read the complete log record continue; @@ -285,8 +270,10 @@ public class LogReader implements ILogReader { return logRecord; } - private void getLogFile() throws ACIDException { + private void getLogFile() { try { + // close existing file (if any) before opening another one + close(); logFile = logMgr.getLogFile(readLSN); fileBeginLSN = logFile.getFileBeginLSN(); } catch (IOException e) { @@ -295,10 +282,11 @@ public class LogReader implements ILogReader { } @Override - public void close() throws ACIDException { + public void close() { try { if (logFile != null) { logFile.close(); + logFile = null; } } catch (IOException e) { throw new ACIDException(e);
