Repository: asterixdb Updated Branches: refs/heads/master 63b92981e -> a0b29c564
[NO ISSUE][*DB] LogFlusher fixes Change-Id: I19e150f2560573738938967f389a397ad7150a4d Reviewed-on: https://asterix-gerrit.ics.uci.edu/2106 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a0b29c56 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a0b29c56 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a0b29c56 Branch: refs/heads/master Commit: a0b29c5641cca500428b95a824eae852dfd78c13 Parents: 63b9298 Author: Michael Blow <[email protected]> Authored: Fri Oct 27 12:05:01 2017 -0400 Committer: Michael Blow <[email protected]> Committed: Fri Oct 27 15:22:40 2017 -0700 ---------------------------------------------------------------------- .../asterix/common/transactions/ILogBuffer.java | 3 +- .../asterix/common/utils/InterruptUtil.java | 118 +++++++++++++++++++ .../management/service/logging/LogBuffer.java | 38 +++--- .../management/service/logging/LogManager.java | 89 ++++++-------- 4 files changed, 176 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java index 8e67603..6bdce73 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java @@ -34,8 +34,9 @@ public interface ILogBuffer { /** * flush content of buffer to disk + * @param stopping */ - void flush(); + void flush(boolean stopping); /** * @param logSize http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java new file mode 100644 index 0000000..4c65c66 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.utils; + +public class InterruptUtil { + /** + * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible + * completes, the current thread will be re-interrupted, if the original operation was interrupted. + */ + public static void doUninterruptibly(Interruptible interruptible) { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible + * completes, the current thread will be re-interrupted, if the original operation was interrupted. + */ + public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception { + boolean interrupted = false; + try { + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. + * + * @return true if the original operation was interrupted, otherwise false + */ + public static boolean doUninterruptiblyGet(Interruptible interruptible) { + boolean interrupted = false; + while (true) { + try { + interruptible.run(); + break; + } catch (InterruptedException e) { // NOSONAR- contract states caller must handle + interrupted = true; + } + } + return interrupted; + } + + /** + * Executes the passed interruptible, retrying if the operation is interrupted. If the operation throws an + * exception after being previously interrupted, the current thread will be re-interrupted. + * + * @return true if the original operation was interrupted, otherwise false + */ + public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception { + boolean interrupted = false; + boolean success = false; + while (true) { + try { + interruptible.run(); + success = true; + break; + } catch (InterruptedException e) { // NOSONAR- contract states caller must handle + interrupted = true; + } finally { + if (!success && interrupted) { + Thread.currentThread().interrupt(); + } + } + } + return interrupted; + } + + @FunctionalInterface + public interface Interruptible { + void run() throws InterruptedException; + } + + @FunctionalInterface + public interface ThrowingInterruptible { + void run() throws Exception; // NOSONAR + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 081cf02..668eab1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -105,15 +105,15 @@ public class LogBuffer implements ILogBuffer { if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.WAIT) { logRecord.isFlushed(false); - syncCommitQ.offer(logRecord); + syncCommitQ.add(logRecord); } if (logRecord.getLogType() == LogType.FLUSH) { logRecord.isFlushed(false); - flushQ.offer(logRecord); + flushQ.add(logRecord); } } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { - remoteJobsQ.offer(logRecord); + remoteJobsQ.add(logRecord); } this.notify(); } @@ -168,29 +168,30 @@ public class LogBuffer implements ILogBuffer { //////////////////////////////////// @Override - public void flush() { + public void flush(boolean stopping) { + boolean interrupted = false; try { int endOffset; while (!full.get()) { - synchronized (this) { - if (appendOffset - flushOffset == 0 && !full.get()) { - try { + try { + synchronized (this) { + if (appendOffset - flushOffset == 0 && !full.get()) { if (IS_DEBUG_MODE) { LOGGER.info("flush()| appendOffset: " + appendOffset + ", flushOffset: " + flushOffset + ", full: " + full.get()); } - if (stop) { + if (stopping || stop) { fileChannel.close(); return; } - this.wait(); - } catch (InterruptedException e) { - continue; + wait(); } + endOffset = appendOffset; } - endOffset = appendOffset; - } internalFlush(flushOffset, endOffset); + } catch (InterruptedException e) { + interrupted = true; + } } internalFlush(flushOffset, appendOffset); if (isLastPage) { @@ -198,6 +199,10 @@ public class LogBuffer implements ILogBuffer { } } catch (IOException e) { throw new IllegalStateException(e); + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } @@ -230,7 +235,7 @@ public class LogBuffer implements ILogBuffer { if (endOffset > beginOffset) { logBufferTailReader.initializeScan(beginOffset, endOffset); - ITransactionContext txnCtx = null; + ITransactionContext txnCtx; LogRecord logRecord = logBufferTailReader.next(); while (logRecord != null) { @@ -327,8 +332,9 @@ public class LogBuffer implements ILogBuffer { } @Override - public void stop() { - this.stop = true; + public synchronized void stop() { + stop = true; + notifyAll(); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index e5e91e8..5f9369d 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -33,7 +33,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,6 +51,7 @@ import org.apache.asterix.common.transactions.LogManagerProperties; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.MutableLong; import org.apache.asterix.common.transactions.TxnLogFile; +import org.apache.asterix.common.utils.InterruptUtil; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; public class LogManager implements ILogManager, ILifeCycleComponent { @@ -162,7 +163,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { } } - /** + /* * To eliminate the case where the modulo of the next appendLSN = 0 (the next * appendLSN = the first LSN of the next log file), we do not allow a log to be * written at the last offset of the current file. @@ -616,13 +617,11 @@ public class LogManager implements ILogManager, ILifeCycleComponent { * The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH log and there are no empty log buffers available to log it. */ private class FlushLogsLogger extends Thread { - private ILogRecord logRecord; - @Override public void run() { while (true) { try { - logRecord = flushLogsQ.take(); + ILogRecord logRecord = flushLogsQ.take(); appendToLogTail(logRecord); } catch (ACIDException e) { e.printStackTrace(); @@ -641,77 +640,57 @@ class LogFlusher implements Callable<Boolean> { private final LinkedBlockingQueue<ILogBuffer> emptyQ; private final LinkedBlockingQueue<ILogBuffer> flushQ; private final LinkedBlockingQueue<ILogBuffer> stashQ; - private ILogBuffer flushPage; - private final AtomicBoolean isStarted; - private final AtomicBoolean terminateFlag; + private volatile ILogBuffer flushPage; + private volatile boolean stopping; + private final Semaphore started; - public LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ, + LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ, LinkedBlockingQueue<ILogBuffer> stashQ) { this.logMgr = logMgr; this.emptyQ = emptyQ; this.flushQ = flushQ; this.stashQ = stashQ; - flushPage = null; - isStarted = new AtomicBoolean(false); - terminateFlag = new AtomicBoolean(false); - + this.started = new Semaphore(0); } public void terminate() { - //make sure the LogFlusher thread started before terminating it. - synchronized (isStarted) { - while (!isStarted.get()) { - try { - isStarted.wait(); - } catch (InterruptedException e) { - //ignore - } - } - } + // make sure the LogFlusher thread started before terminating it. + InterruptUtil.doUninterruptibly(started::acquire); - terminateFlag.set(true); - if (flushPage != null) { - synchronized (flushPage) { - flushPage.stop(); - flushPage.notify(); - } + stopping = true; + + // we must tell any active flush, if any, to stop + final ILogBuffer currentFlushPage = this.flushPage; + if (currentFlushPage != null) { + currentFlushPage.stop(); } - //[Notice] - //The return value doesn't need to be checked - //since terminateFlag will trigger termination if the flushQ is full. - flushQ.offer(POISON_PILL); + // finally we put a POISON_PILL onto the flushQ to indicate to the flusher it is time to exit + InterruptUtil.doUninterruptibly(() -> flushQ.put(POISON_PILL)); } @Override - public Boolean call() { - synchronized (isStarted) { - isStarted.set(true); - isStarted.notify(); - } + public Boolean call() throws InterruptedException { + started.release(); + boolean interrupted = false; try { while (true) { flushPage = null; - try { - flushPage = flushQ.take(); - if (flushPage == POISON_PILL || terminateFlag.get()) { - return true; - } - } catch (InterruptedException e) { - if (flushPage == null) { - continue; - } + interrupted = InterruptUtil.doUninterruptiblyGet(() -> flushPage = flushQ.take()) || interrupted; + if (flushPage == POISON_PILL) { + return true; } - flushPage.flush(); - emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove()); + flushPage.flush(stopping); + + // TODO(mblow): recycle large pages + emptyQ.add(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove()); } } catch (Exception e) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("-------------------------------------------------------------------------"); - LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state."); - LOGGER.info("-------------------------------------------------------------------------"); - } - e.printStackTrace(); + LOGGER.log(Level.SEVERE, "LogFlusher is terminating abnormally. System is in unusable state.", e); throw e; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } }
