NIFI-800: Ensured that all Throwable that gets thrown when updating a Partition 
marks the Partition as unusable until a checkpoint occurs


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/34e08ba7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/34e08ba7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/34e08ba7

Branch: refs/heads/NIFI-744
Commit: 34e08ba7752a8543df4155951d57cb432ed2fc63
Parents: 496ebfb
Author: Mark Payne <[email protected]>
Authored: Mon Aug 3 08:43:18 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Aug 3 08:43:18 2015 -0400

----------------------------------------------------------------------
 .../org/wali/MinimalLockingWriteAheadLog.java   |  18 +-
 .../src/test/java/org/wali/DummyRecord.java     |   5 +
 .../test/java/org/wali/DummyRecordSerde.java    |   8 +
 .../wali/TestMinimalLockingWriteAheadLog.java   | 215 +++++++++++++++++++
 4 files changed, 238 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 5c8b4c8..39fe65a 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -60,7 +60,7 @@ import java.util.regex.Pattern;
 
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
-
+import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,10 +226,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
                         try {
                             partition.update(records, transactionId, 
unmodifiableRecordMap, forceSync);
-                        } catch (final Exception e) {
+                        } catch (final Throwable t) {
                             partition.blackList();
                             numberBlackListedPartitions.incrementAndGet();
-                            throw e;
+                            throw t;
                         }
 
                         if (forceSync && syncListener != null) {
@@ -511,9 +511,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                 for (final Partition<T> partition : partitions) {
                     try {
                         partition.rollover();
-                    } catch (final IOException ioe) {
+                    } catch (final Throwable t) {
                         partition.blackList();
-                        throw ioe;
+                        numberBlackListedPartitions.getAndIncrement();
+                        throw t;
                     }
                 }
 
@@ -878,7 +879,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         }
 
         private DataInputStream createDataInputStream(final Path path) throws 
IOException {
-            return new DataInputStream(new 
BufferedInputStream(Files.newInputStream(path)));
+            return new DataInputStream(new ByteCountingInputStream(new 
BufferedInputStream(Files.newInputStream(path))));
         }
 
         private DataInputStream getRecoveryStream() throws IOException {
@@ -892,6 +893,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                     return null;
                 }
 
+                logger.debug("{} recovering from {}", this, nextRecoveryPath);
                 recoveryIn = createDataInputStream(nextRecoveryPath);
                 if (hasMoreData(recoveryIn)) {
                     final String waliImplementationClass = 
recoveryIn.readUTF();
@@ -972,8 +974,8 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             int transactionFlag;
             do {
                 final S record = serde.deserializeEdit(recoveryIn, 
currentRecordMap, recoveryVersion);
-                if (logger.isTraceEnabled()) {
-                    logger.trace("{} Recovering Transaction {}: {}", new 
Object[]{this, maxTransactionId.get(), record});
+                if (logger.isDebugEnabled()) {
+                    logger.debug("{} Recovering Transaction {}: {}", new 
Object[] { this, maxTransactionId.get(), record });
                 }
 
                 final Object recordId = serde.getRecordIdentifier(record);

http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
index e0f7f96..bf15ba7 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java
@@ -58,4 +58,9 @@ public class DummyRecord {
     public String getProperty(final String name) {
         return props.get(name);
     }
+
+    @Override
+    public String toString() {
+        return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" 
+ updateType + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
index 8cc7860..3a4e79f 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
@@ -26,6 +26,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
 
     public static final int NUM_UPDATE_TYPES = UpdateType.values().length;
     private int throwIOEAfterNserializeEdits = -1;
+    private int throwOOMEAfterNserializeEdits = -1;
     private int serializeEditCount = 0;
 
     @Override
@@ -33,6 +34,9 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
         if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= 
throwIOEAfterNserializeEdits)) {
             throw new IOException("Serialized " + (serializeEditCount - 1) + " 
records successfully, so now it's time to throw IOE");
         }
+        if (throwOOMEAfterNserializeEdits >= 0 && (serializeEditCount++ >= 
throwOOMEAfterNserializeEdits)) {
+            throw new OutOfMemoryError("Serialized " + (serializeEditCount - 
1) + " records successfully, so now it's time to throw OOME");
+        }
 
         out.write(record.getUpdateType().ordinal());
         out.writeUTF(record.getId());
@@ -100,6 +104,10 @@ public class DummyRecordSerde implements 
SerDe<DummyRecord> {
         this.throwIOEAfterNserializeEdits = n;
     }
 
+    public void setThrowOOMEAfterNSerializeEdits(final int n) {
+        this.throwOOMEAfterNserializeEdits = n;
+    }
+
     @Override
     public String getLocation(final DummyRecord record) {
         return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 57f3495..29d2e7f 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -28,16 +28,180 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestMinimalLockingWriteAheadLog {
+    private static final Logger logger = 
LoggerFactory.getLogger(TestMinimalLockingWriteAheadLog.class);
+
+
+    @Test
+    public void testRepoDoesntContinuallyGrowOnOutOfMemoryError() throws 
IOException, InterruptedException {
+        final int numPartitions = 8;
+
+        final Path path = Paths.get("target/minimal-locking-repo");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        try {
+            final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+            assertTrue(initialRecs.isEmpty());
+
+            serde.setThrowOOMEAfterNSerializeEdits(100);
+            for (int i = 0; i < 108; i++) {
+                try {
+                    final DummyRecord record = new 
DummyRecord(String.valueOf(i), UpdateType.CREATE);
+                    repo.update(Collections.singleton(record), false);
+                } catch (final OutOfMemoryError oome) {
+                    logger.info("Received OOME on record " + i);
+                }
+            }
+
+            long expectedSize = sizeOf(path.toFile());
+            for (int i = 0; i < 1000; i++) {
+                try {
+                    final DummyRecord record = new 
DummyRecord(String.valueOf(i), UpdateType.CREATE);
+                    repo.update(Collections.singleton(record), false);
+                    Assert.fail("Expected IOE but it didn't happen");
+                } catch (final IOException ioe) {
+                    // will get IOException because all Partitions have been 
blacklisted
+                }
+            }
+
+            long newSize = sizeOf(path.toFile());
+            assertEquals(expectedSize, newSize);
+
+            try {
+                repo.checkpoint();
+                Assert.fail("Expected OOME but it didn't happen");
+            } catch (final OutOfMemoryError oome) {
+            }
+
+            expectedSize = sizeOf(path.toFile());
+
+            for (int i = 0; i < 100000; i++) {
+                try {
+                    final DummyRecord record = new 
DummyRecord(String.valueOf(i), UpdateType.CREATE);
+                    repo.update(Collections.singleton(record), false);
+                    Assert.fail("Expected IOE but it didn't happen");
+                } catch (final IOException ioe) {
+                    // will get IOException because all Partitions have been 
blacklisted
+                }
+            }
+
+            newSize = sizeOf(path.toFile());
+            assertEquals(expectedSize, newSize);
+        } finally {
+            repo.shutdown();
+        }
+    }
+
+    /**
+     * This test is intended to continually update the Write-ahead log using 
many threads, then
+     * stop and restore the repository to check for any corruption. There were 
reports of potential threading
+     * issues leading to repository corruption. This was an attempt to 
replicate. It should not be run as a
+     * unit test, really, but will be left, as it can be valuable to exercise 
the implementation
+     *
+     * @throws IOException if unable to read from/write to the write-ahead log
+     * @throws InterruptedException if a thread is interrupted
+     */
+    @Test
+    @Ignore
+    public void tryToCauseThreadingIssue() throws IOException, 
InterruptedException {
+        System.setProperty("org.slf4j.simpleLogger.log.org.wali", "INFO");
+
+        final int numThreads = 12;
+        final long iterationsPerThread = 1000000;
+        final int numAttempts = 1000;
+
+        final Path path = Paths.get("D:/dummy/minimal-locking-repo");
+        path.toFile().mkdirs();
+
+        final AtomicReference<WriteAheadRepository<DummyRecord>> writeRepoRef 
= new AtomicReference<>();
+        final AtomicBoolean checkpointing = new AtomicBoolean(false);
+
+        final Thread bgThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    checkpointing.set(true);
+
+                    final WriteAheadRepository<DummyRecord> repo = 
writeRepoRef.get();
+                    if (repo != null) {
+                        try {
+                            repo.checkpoint();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                            Assert.fail();
+                        }
+                    }
+
+                    checkpointing.set(false);
+
+                    try {
+                        TimeUnit.SECONDS.sleep(5);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+        });
+        bgThread.setDaemon(true);
+        bgThread.start();
+
+        for (int x = 0; x < numAttempts; x++) {
+            final DummyRecordSerde serde = new DummyRecordSerde();
+            final WriteAheadRepository<DummyRecord> writeRepo = new 
MinimalLockingWriteAheadLog<>(path, 256, serde, null);
+            final Collection<DummyRecord> writeRecords = 
writeRepo.recoverRecords();
+            for (final DummyRecord record : writeRecords) {
+                assertEquals("B", record.getProperty("A"));
+            }
+
+            writeRepoRef.set(writeRepo);
+
+            final Thread[] threads = new Thread[numThreads];
+            for (int i = 0; i < numThreads; i++) {
+                final Thread t = new 
InlineCreationInsertThread(iterationsPerThread, writeRepo);
+                t.start();
+                threads[i] = t;
+            }
+
+            for (final Thread t : threads) {
+                t.join();
+            }
+
+            writeRepoRef.set(null);
+            writeRepo.shutdown();
+
+            boolean cp = checkpointing.get();
+            while (cp) {
+                Thread.sleep(100L);
+                cp = checkpointing.get();
+            }
+
+            final WriteAheadRepository<DummyRecord> readRepo = new 
MinimalLockingWriteAheadLog<>(path, 256, serde, null);
+            // ensure that we are able to recover the records properly
+            final Collection<DummyRecord> readRecords = 
readRepo.recoverRecords();
+            for (final DummyRecord record : readRecords) {
+                assertEquals("B", record.getProperty("A"));
+            }
+            readRepo.shutdown();
+        }
+    }
 
     @Test
     public void testWrite() throws IOException, InterruptedException {
@@ -285,6 +449,40 @@ public class TestMinimalLockingWriteAheadLog {
         }
     }
 
+
+    private static class InlineCreationInsertThread extends Thread {
+        private final long iterations;
+        private final WriteAheadRepository<DummyRecord> repo;
+
+        public InlineCreationInsertThread(final long numInsertions, final 
WriteAheadRepository<DummyRecord> repo) {
+            this.iterations = numInsertions;
+            this.repo = repo;
+        }
+
+        @Override
+        public void run() {
+            final List<DummyRecord> list = new ArrayList<>(1);
+            list.add(null);
+            final UpdateType[] updateTypes = new UpdateType[] { 
UpdateType.CREATE, UpdateType.DELETE, UpdateType.UPDATE };
+            final Random random = new Random();
+
+            for (long i = 0; i < iterations; i++) {
+                final int updateTypeIndex = random.nextInt(updateTypes.length);
+                final UpdateType updateType = updateTypes[updateTypeIndex];
+
+                final DummyRecord record = new DummyRecord(String.valueOf(i), 
updateType);
+                record.setProperty("A", "B");
+                list.set(0, record);
+
+                try {
+                    repo.update(list, false);
+                } catch (final Throwable t) {
+                    t.printStackTrace();
+                }
+            }
+        }
+    }
+
     private void deleteRecursively(final File file) {
         final File[] children = file.listFiles();
         if (children != null) {
@@ -295,4 +493,21 @@ public class TestMinimalLockingWriteAheadLog {
 
         file.delete();
     }
+
+    private long sizeOf(final File file) {
+        long size = 0L;
+        if (file.isDirectory()) {
+            final File[] children = file.listFiles();
+            if (children != null) {
+                for (final File child : children) {
+                    size += sizeOf(child);
+                }
+            }
+        }
+
+        size += file.length();
+
+        return size;
+    }
+
 }

Reply via email to