Repository: nifi
Updated Branches:
  refs/heads/master 8f4d13eea -> 5872eb3c4


NIFI-5331: When checkpointing SequentialAccessWriteAheadLog, if the journal is 
not healthy, ensure that we roll it over and ensure that if an Exception is 
thrown when attempting to fsync() or close() the journal, we continue creating 
a new one.
This closes #2952.
Signed-off-by: Brandon Devries <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5872eb3c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5872eb3c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5872eb3c

Branch: refs/heads/master
Commit: 5872eb3c4a060684a88555f1c697f07bec4c26dd
Parents: 8f4d13e
Author: Mark Payne <[email protected]>
Authored: Wed Aug 15 10:23:49 2018 -0400
Committer: Brandon Devries <[email protected]>
Committed: Thu Oct 4 15:25:53 2018 -0400

----------------------------------------------------------------------
 .../.SequentialAccessWriteAheadLog.java.swp     | Bin 0 -> 16384 bytes
 .../nifi/wali/LengthDelimitedJournal.java       |  20 ++++++++------
 .../wali/SequentialAccessWriteAheadLog.java     |  27 ++++++++++++-------
 .../org/apache/nifi/wali/WriteAheadJournal.java |   5 ++++
 4 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp
new file mode 100644
index 0000000..1d5f641
Binary files /dev/null and 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
index 0b2a8d3..c10d366 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
@@ -17,6 +17,14 @@
 
 package org.apache.nifi.wali;
 
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
@@ -38,14 +46,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.LimitingInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.SerDeFactory;
-import org.wali.UpdateType;
-
 public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
     private static final Logger logger = 
LoggerFactory.getLogger(LengthDelimitedJournal.class);
     private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new 
StandardJournalSummary(-1L, -1L, 0);
@@ -90,6 +90,10 @@ public class LengthDelimitedJournal<T> implements 
WriteAheadJournal<T> {
         return bufferedOut;
     }
 
+    @Override
+    public synchronized boolean isHealthy() {
+        return !closed && !poisoned;
+    }
 
     @Override
     public synchronized void writeHeader() throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
index e4a1db7..cba5184 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -17,6 +17,12 @@
 
 package org.apache.nifi.wali;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDeFactory;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -32,12 +38,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDeFactory;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
-
 /**
  * <p>
  * This implementation of WriteAheadRepository provides the ability to write 
all updates to the
@@ -251,13 +251,22 @@ public class SequentialAccessWriteAheadLog<T> implements 
WriteAheadRepository<T>
         try {
             if (journal != null) {
                 final JournalSummary journalSummary = journal.getSummary();
-                if (journalSummary.getTransactionCount() == 0) {
+                if (journalSummary.getTransactionCount() == 0 && 
journal.isHealthy()) {
                     logger.debug("Will not checkpoint Write-Ahead Log because 
no updates have occurred since last checkpoint");
                     return snapshot.getRecordCount();
                 }
 
-                journal.fsync();
-                journal.close();
+                try {
+                    journal.fsync();
+                } catch (final Exception e) {
+                    logger.error("Failed to synch Write-Ahead Log's journal to 
disk at {}", storageDirectory, e);
+                }
+
+                try {
+                    journal.close();
+                } catch (final Exception e) {
+                    logger.error("Failed to close Journal while attempting to 
checkpoint Write-Ahead Log at {}", storageDirectory);
+                }
 
                 nextTransactionId = Math.max(nextTransactionId, 
journalSummary.getLastTransactionId() + 1);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5872eb3c/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
index f35d47a..c44e1cb 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
@@ -48,4 +48,9 @@ public interface WriteAheadJournal<T> extends Closeable {
      * @throws IOException if unable to write to the underlying storage 
mechanism.
      */
     JournalSummary getSummary();
+
+    /**
+     * @return <code>true</code> if the journal is healthy and can be written 
to, <code>false</code> if either the journal has been closed or is poisoned
+     */
+    boolean isHealthy();
 }

Reply via email to