Repository: nifi Updated Branches: refs/heads/master 8f4d13eea -> 5872eb3c4
NIFI-5331: When checkpointing SequentialAccessWriteAheadLog, if the journal is not healthy, ensure that we roll it over and ensure that if an Exception is thrown when attempting to fsync() or close() the journal, we continue creating a new one. This closes #2952. Signed-off-by: Brandon Devries <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5872eb3c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5872eb3c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5872eb3c Branch: refs/heads/master Commit: 5872eb3c4a060684a88555f1c697f07bec4c26dd Parents: 8f4d13e Author: Mark Payne <[email protected]> Authored: Wed Aug 15 10:23:49 2018 -0400 Committer: Brandon Devries <[email protected]> Committed: Thu Oct 4 15:25:53 2018 -0400 ---------------------------------------------------------------------- .../.SequentialAccessWriteAheadLog.java.swp | Bin 0 -> 16384 bytes .../nifi/wali/LengthDelimitedJournal.java | 20 ++++++++------ .../wali/SequentialAccessWriteAheadLog.java | 27 ++++++++++++------- .../org/apache/nifi/wali/WriteAheadJournal.java | 5 ++++ 4 files changed, 35 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp new file mode 100644 index 0000000..1d5f641 Binary files /dev/null and b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp differ http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java index 0b2a8d3..c10d366 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java @@ -17,6 +17,14 @@ package org.apache.nifi.wali; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.SerDeFactory; +import org.wali.UpdateType; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; @@ -38,14 +46,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.stream.io.LimitingInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wali.SerDe; -import org.wali.SerDeFactory; -import org.wali.UpdateType; - public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> { private static final Logger logger = LoggerFactory.getLogger(LengthDelimitedJournal.class); private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0); @@ -90,6 +90,10 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> { return bufferedOut; } + @Override + public synchronized boolean isHealthy() { + return !closed && !poisoned; + } @Override public synchronized void writeHeader() throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java index e4a1db7..cba5184 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java @@ -17,6 +17,12 @@ package org.apache.nifi.wali; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDeFactory; +import org.wali.SyncListener; +import org.wali.WriteAheadRepository; + import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -32,12 +38,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wali.SerDeFactory; -import org.wali.SyncListener; -import org.wali.WriteAheadRepository; - /** * <p> * This implementation of WriteAheadRepository provides the ability to write all updates to the @@ -251,13 +251,22 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T> try { if (journal != null) { final JournalSummary journalSummary = journal.getSummary(); - if (journalSummary.getTransactionCount() == 0) { + if (journalSummary.getTransactionCount() == 0 && journal.isHealthy()) { logger.debug("Will not checkpoint Write-Ahead Log because no updates have occurred since last checkpoint"); return snapshot.getRecordCount(); } - journal.fsync(); - journal.close(); + try { + journal.fsync(); + } catch (final Exception e) { + logger.error("Failed to synch Write-Ahead Log's journal to disk at {}", storageDirectory, e); + } + + try { + journal.close(); + } catch (final Exception e) { + logger.error("Failed to close Journal while attempting to checkpoint Write-Ahead Log at {}", storageDirectory); + } nextTransactionId = Math.max(nextTransactionId, journalSummary.getLastTransactionId() + 1); } http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java index f35d47a..c44e1cb 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java @@ -48,4 +48,9 @@ public interface WriteAheadJournal<T> extends Closeable { * @throws IOException if unable to write to the underlying storage mechanism. */ JournalSummary getSummary(); + + /** + * @return <code>true</code> if the journal is healthy and can be written to, <code>false</code> if either the journal has been closed or is poisoned + */ + boolean isHealthy(); }
