NIFI-3273 This closes #1611. Handle the case of trailing NUL bytes in 
MinimalLockingWriteAheadLog


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f2ac39f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f2ac39f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f2ac39f

Branch: refs/heads/master
Commit: 0f2ac39f69c1a744f151f0d924c9978f6790b7f7
Parents: 0207f21
Author: Mark Payne <[email protected]>
Authored: Mon Apr 17 13:35:10 2017 -0400
Committer: joewitt <[email protected]>
Committed: Wed Apr 19 22:08:59 2017 -0700

----------------------------------------------------------------------
 .../org/wali/MinimalLockingWriteAheadLog.java   |  69 ++++++++-
 .../test/java/org/wali/DummyRecordSerde.java    |  14 +-
 .../wali/TestMinimalLockingWriteAheadLog.java   | 149 +++++++++++++++++++
 3 files changed, 219 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0f2ac39f/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 8949073..0914a79 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -663,8 +663,8 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
      * @param <S> type of record held in the partitions
      */
     private static class Partition<S> {
-
         public static final String JOURNAL_EXTENSION = ".journal";
+        private static final int NUL_BYTE = 0;
         private static final Pattern JOURNAL_FILENAME_PATTERN = 
Pattern.compile("\\d+\\.journal");
 
         private final SerDeFactory<S> serdeFactory;
@@ -1013,6 +1013,17 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                     transactionId = recoveryIn.readLong();
                 } catch (final EOFException e) {
                     continue;
+                } catch (final Exception e) {
+                    // If the stream consists solely of NUL bytes, then we 
want to treat it
+                    // the same as an EOF because we see this happen when we 
suddenly lose power
+                    // while writing to a file.
+                    if (remainingBytesAllNul(recoveryIn)) {
+                        logger.warn("Failed to recover data from Write-Ahead 
Log Partition because encountered trailing NUL bytes. "
+                            + "This will sometimes happen after a sudden power 
loss. The rest of this journal file will be skipped for recovery purposes.");
+                        continue;
+                    } else {
+                        throw e;
+                    }
                 }
 
                 this.maxTransactionId.set(transactionId);
@@ -1020,6 +1031,27 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             }
         }
 
+        /**
+         * In the case of a sudden power loss, it is common - at least in a 
Linux journaling File System -
+         * that the partition file that is being written to will have many 
trailing "NUL bytes" (0's).
+         * If this happens, then on restart we want to treat this as an 
incomplete transaction, so we detect
+         * this case explicitly.
+         *
+         * @param in the input stream to scan
+         * @return <code>true</code> if the InputStream contains no data or 
contains only NUL bytes
+         * @throws IOException if unable to read from the given InputStream
+         */
+        private boolean remainingBytesAllNul(final InputStream in) throws 
IOException {
+            int nextByte;
+            while ((nextByte = in.read()) != -1) {
+                if (nextByte != NUL_BYTE) {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
         private boolean hasMoreData(final InputStream in) throws IOException {
             in.mark(1);
             final int nextByte = in.read();
@@ -1059,7 +1091,40 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
             int transactionFlag;
             do {
-                final S record = serde.deserializeEdit(recoveryIn, 
currentRecordMap, recoveryVersion);
+                final S record;
+                try {
+                    record = serde.deserializeEdit(recoveryIn, 
currentRecordMap, recoveryVersion);
+                } catch (final EOFException eof) {
+                    throw eof;
+                } catch (final Exception e) {
+                    // If the stream consists solely of NUL bytes, then we 
want to treat it
+                    // the same as an EOF because we see this happen when we 
suddenly lose power
+                    // while writing to a file. We also have logic already in 
the caller of this
+                    // method to properly handle EOFException's, so we will 
simply throw an EOFException
+                    // ourselves. However, if that is not the case, then 
something else has gone wrong.
+                    // In such a case, there is not much that we can do. If we 
simply skip over the transaction,
+                    // then the transaction may be indicating that a new 
attribute was added or changed. Or the
+                    // content of the FlowFile changed. A subsequent 
transaction for the same FlowFile may then
+                    // update the connection that is holding the FlowFile. In 
this case, if we simply skip over
+                    // the transaction, we end up with a FlowFile in a queue 
that has the wrong attributes or
+                    // content, and that can result in some very bad behavior 
- even security vulnerabilities if
+                    // a Route processor, for instance, routes incorrectly due 
to a missing attribute or content
+                    // is pointing to a previous claim where sensitive values 
have not been removed, etc. So
+                    // instead of attempting to skip the transaction and move 
on, we instead just throw the Exception
+                    // indicating that the write-ahead log is corrupt and 
allow the user to handle it as he/she sees
+                    // fit (likely this will result in deleting the repo, but 
it's possible that it could be repaired
+                    // manually or through some sort of script).
+                    if (remainingBytesAllNul(recoveryIn)) {
+                        final EOFException eof = new EOFException("Failed to 
recover data from Write-Ahead Log Partition because encountered trailing NUL 
bytes. "
+                            + "This will sometimes happen after a sudden power 
loss. The rest of this journal file will be skipped for recovery purposes.");
+                        eof.addSuppressed(e);
+                        throw eof;
+                    } else {
+                        throw e;
+                    }
+                }
+
+
                 if (logger.isDebugEnabled()) {
                     logger.debug("{} Recovering Transaction {}: {}", new 
Object[] { this, maxTransactionId.get(), record });
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f2ac39f/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
index 3a4e79f..e9f3b01 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
@@ -18,13 +18,11 @@ package org.wali;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.Map;
 
 public class DummyRecordSerde implements SerDe<DummyRecord> {
 
-    public static final int NUM_UPDATE_TYPES = UpdateType.values().length;
     private int throwIOEAfterNserializeEdits = -1;
     private int throwOOMEAfterNserializeEdits = -1;
     private int serializeEditCount = 0;
@@ -38,7 +36,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
             throw new OutOfMemoryError("Serialized " + (serializeEditCount - 
1) + " records successfully, so now it's time to throw OOME");
         }
 
-        out.write(record.getUpdateType().ordinal());
+        out.writeUTF(record.getUpdateType().name());
         out.writeUTF(record.getId());
 
         if (record.getUpdateType() != UpdateType.DELETE) {
@@ -58,14 +56,8 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
 
     @Override
     public DummyRecord deserializeRecord(final DataInputStream in, final int 
version) throws IOException {
-        final int index = in.read();
-        if (index < 0) {
-            throw new EOFException();
-        }
-        if (index >= NUM_UPDATE_TYPES) {
-            throw new IOException("Corrupt stream; got UpdateType value of " + 
index + " but there are only " + NUM_UPDATE_TYPES + " valid values");
-        }
-        final UpdateType updateType = UpdateType.values()[index];
+        final String updateTypeName = in.readUTF();
+        final UpdateType updateType = UpdateType.valueOf(updateTypeName);
         final String id = in.readUTF();
         final DummyRecord record = new DummyRecord(id, updateType);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f2ac39f/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index cbca968..ef33f57 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -27,12 +27,15 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -457,6 +460,152 @@ public class TestMinimalLockingWriteAheadLog {
         assertTrue(record3);
     }
 
+
+    @Test
+    public void testRecoverFileThatHasTrailingNULBytesAndTruncation() throws 
IOException {
+        final int numPartitions = 5;
+        final Path path = 
Paths.get("target/testRecoverFileThatHasTrailingNULBytes");
+        deleteRecursively(path.toFile());
+        Files.createDirectories(path);
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final List<DummyRecord> firstTransaction = new ArrayList<>();
+        firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
+
+        final List<DummyRecord> secondTransaction = new ArrayList<>();
+        secondTransaction.add(new DummyRecord("1", 
UpdateType.UPDATE).setProperty("abc", "123"));
+        secondTransaction.add(new DummyRecord("2", 
UpdateType.UPDATE).setProperty("cba", "123"));
+        secondTransaction.add(new DummyRecord("3", 
UpdateType.UPDATE).setProperty("aaa", "123"));
+
+        final List<DummyRecord> thirdTransaction = new ArrayList<>();
+        thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
+        thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
+
+        repo.update(firstTransaction, true);
+        repo.update(secondTransaction, true);
+        repo.update(thirdTransaction, true);
+
+        repo.shutdown();
+
+        final File partition3Dir = path.resolve("partition-2").toFile();
+        final File journalFile = partition3Dir.listFiles()[0];
+        final byte[] contents = Files.readAllBytes(journalFile.toPath());
+
+        // Truncate the contents of the journal file by 8 bytes. Then replace 
with 28 trailing NUL bytes,
+        // as this is what we often see when we have a sudden power loss.
+        final byte[] truncated = Arrays.copyOfRange(contents, 0, 
contents.length - 8);
+        final byte[] withNuls = new byte[truncated.length + 28];
+        System.arraycopy(truncated, 0, withNuls, 0, truncated.length);
+
+        try (final OutputStream fos = new FileOutputStream(journalFile)) {
+            fos.write(withNuls);
+        }
+
+        final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> recoveredRecords = 
recoverRepo.recoverRecords();
+        assertFalse(recoveredRecords.isEmpty());
+        assertEquals(3, recoveredRecords.size());
+
+        boolean record1 = false, record2 = false, record3 = false;
+        for (final DummyRecord record : recoveredRecords) {
+            switch (record.getId()) {
+                case "1":
+                    record1 = true;
+                    assertEquals("123", record.getProperty("abc"));
+                    break;
+                case "2":
+                    record2 = true;
+                    assertEquals("123", record.getProperty("cba"));
+                    break;
+                case "3":
+                    record3 = true;
+                    assertEquals("123", record.getProperty("aaa"));
+                    break;
+            }
+        }
+
+        assertTrue(record1);
+        assertTrue(record2);
+        assertTrue(record3);
+    }
+
+    @Test
+    public void testRecoverFileThatHasTrailingNULBytesNoTruncation() throws 
IOException {
+        final int numPartitions = 5;
+        final Path path = 
Paths.get("target/testRecoverFileThatHasTrailingNULBytes");
+        deleteRecursively(path.toFile());
+        Files.createDirectories(path);
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final List<DummyRecord> firstTransaction = new ArrayList<>();
+        firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
+        firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
+
+        final List<DummyRecord> secondTransaction = new ArrayList<>();
+        secondTransaction.add(new DummyRecord("1", 
UpdateType.UPDATE).setProperty("abc", "123"));
+        secondTransaction.add(new DummyRecord("2", 
UpdateType.UPDATE).setProperty("cba", "123"));
+        secondTransaction.add(new DummyRecord("3", 
UpdateType.UPDATE).setProperty("aaa", "123"));
+
+        final List<DummyRecord> thirdTransaction = new ArrayList<>();
+        thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
+        thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
+
+        repo.update(firstTransaction, true);
+        repo.update(secondTransaction, true);
+        repo.update(thirdTransaction, true);
+
+        repo.shutdown();
+
+        final File partition3Dir = path.resolve("partition-2").toFile();
+        final File journalFile = partition3Dir.listFiles()[0];
+
+        // Truncate the contents of the journal file by 8 bytes. Then replace 
with 28 trailing NUL bytes,
+        // as this is what we often see when we have a sudden power loss.
+        final byte[] withNuls = new byte[28];
+
+        try (final OutputStream fos = new FileOutputStream(journalFile, true)) 
{
+            fos.write(withNuls);
+        }
+
+        final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> recoveredRecords = 
recoverRepo.recoverRecords();
+        assertFalse(recoveredRecords.isEmpty());
+        assertEquals(1, recoveredRecords.size());
+
+        boolean record1 = false, record2 = false, record3 = false;
+        for (final DummyRecord record : recoveredRecords) {
+            switch (record.getId()) {
+                case "1":
+                    record1 = record.getUpdateType() != UpdateType.DELETE;
+                    assertEquals("123", record.getProperty("abc"));
+                    break;
+                case "2":
+                    record2 = record.getUpdateType() != UpdateType.DELETE;
+                    assertEquals("123", record.getProperty("cba"));
+                    break;
+                case "3":
+                    record3 = true;
+                    assertEquals("123", record.getProperty("aaa"));
+                    break;
+            }
+        }
+
+        assertFalse(record1);
+        assertFalse(record2);
+        assertTrue(record3);
+    }
+
     @Test
     public void testCannotModifyLogAfterAllAreBlackListed() throws IOException 
{
         final int numPartitions = 5;

Reply via email to