NIFI-800: Ensured that all Throwable that gets thrown when updating a Partition marks the Partition as unusable until a checkpoint occurs
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/34e08ba7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/34e08ba7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/34e08ba7 Branch: refs/heads/NIFI-744 Commit: 34e08ba7752a8543df4155951d57cb432ed2fc63 Parents: 496ebfb Author: Mark Payne <[email protected]> Authored: Mon Aug 3 08:43:18 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Mon Aug 3 08:43:18 2015 -0400 ---------------------------------------------------------------------- .../org/wali/MinimalLockingWriteAheadLog.java | 18 +- .../src/test/java/org/wali/DummyRecord.java | 5 + .../test/java/org/wali/DummyRecordSerde.java | 8 + .../wali/TestMinimalLockingWriteAheadLog.java | 215 +++++++++++++++++++ 4 files changed, 238 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 5c8b4c8..39fe65a 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -60,7 +60,7 @@ import java.util.regex.Pattern; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; - +import org.apache.nifi.stream.io.ByteCountingInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,10 +226,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor try { partition.update(records, transactionId, unmodifiableRecordMap, forceSync); - } catch (final Exception e) { + } catch (final Throwable t) { partition.blackList(); numberBlackListedPartitions.incrementAndGet(); - throw e; + throw t; } if (forceSync && syncListener != null) { @@ -511,9 +511,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor for (final Partition<T> partition : partitions) { try { partition.rollover(); - } catch (final IOException ioe) { + } catch (final Throwable t) { partition.blackList(); - throw ioe; + numberBlackListedPartitions.getAndIncrement(); + throw t; } } @@ -878,7 +879,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor } private DataInputStream createDataInputStream(final Path path) throws IOException { - return new DataInputStream(new BufferedInputStream(Files.newInputStream(path))); + return new DataInputStream(new ByteCountingInputStream(new BufferedInputStream(Files.newInputStream(path)))); } private DataInputStream getRecoveryStream() throws IOException { @@ -892,6 +893,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor return null; } + logger.debug("{} recovering from {}", this, nextRecoveryPath); recoveryIn = createDataInputStream(nextRecoveryPath); if (hasMoreData(recoveryIn)) { final String waliImplementationClass = recoveryIn.readUTF(); @@ -972,8 +974,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor int transactionFlag; do { final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion); - if (logger.isTraceEnabled()) { - logger.trace("{} Recovering Transaction {}: {}", new Object[]{this, maxTransactionId.get(), record}); + if (logger.isDebugEnabled()) { + logger.debug("{} Recovering Transaction {}: {}", new Object[] { this, maxTransactionId.get(), record }); } final Object recordId = serde.getRecordIdentifier(record); http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java index e0f7f96..bf15ba7 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java @@ -58,4 +58,9 @@ public class DummyRecord { public String getProperty(final String name) { return props.get(name); } + + @Override + public String toString() { + return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" + updateType + "]"; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java index 8cc7860..3a4e79f 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java @@ -26,6 +26,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> { public static final int NUM_UPDATE_TYPES = UpdateType.values().length; private int throwIOEAfterNserializeEdits = -1; + private int throwOOMEAfterNserializeEdits = -1; private int serializeEditCount = 0; @Override @@ -33,6 +34,9 @@ public class DummyRecordSerde implements SerDe<DummyRecord> { if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE"); } + if (throwOOMEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwOOMEAfterNserializeEdits)) { + throw new OutOfMemoryError("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw OOME"); + } out.write(record.getUpdateType().ordinal()); out.writeUTF(record.getId()); @@ -100,6 +104,10 @@ public class DummyRecordSerde implements SerDe<DummyRecord> { this.throwIOEAfterNserializeEdits = n; } + public void setThrowOOMEAfterNSerializeEdits(final int n) { + this.throwOOMEAfterNserializeEdits = n; + } + @Override public String getLocation(final DummyRecord record) { return null; http://git-wip-us.apache.org/repos/asf/nifi/blob/34e08ba7/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index 57f3495..29d2e7f 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -28,16 +28,180 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestMinimalLockingWriteAheadLog { + private static final Logger logger = LoggerFactory.getLogger(TestMinimalLockingWriteAheadLog.class); + + + @Test + public void testRepoDoesntContinuallyGrowOnOutOfMemoryError() throws IOException, InterruptedException { + final int numPartitions = 8; + + final Path path = Paths.get("target/minimal-locking-repo"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + try { + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + serde.setThrowOOMEAfterNSerializeEdits(100); + for (int i = 0; i < 108; i++) { + try { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + } catch (final OutOfMemoryError oome) { + logger.info("Received OOME on record " + i); + } + } + + long expectedSize = sizeOf(path.toFile()); + for (int i = 0; i < 1000; i++) { + try { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + Assert.fail("Expected IOE but it didn't happen"); + } catch (final IOException ioe) { + // will get IOException because all Partitions have been blacklisted + } + } + + long newSize = sizeOf(path.toFile()); + assertEquals(expectedSize, newSize); + + try { + repo.checkpoint(); + Assert.fail("Expected OOME but it didn't happen"); + } catch (final OutOfMemoryError oome) { + } + + expectedSize = sizeOf(path.toFile()); + + for (int i = 0; i < 100000; i++) { + try { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + Assert.fail("Expected IOE but it didn't happen"); + } catch (final IOException ioe) { + // will get IOException because all Partitions have been blacklisted + } + } + + newSize = sizeOf(path.toFile()); + assertEquals(expectedSize, newSize); + } finally { + repo.shutdown(); + } + } + + /** + * This test is intended to continually update the Write-ahead log using many threads, then + * stop and restore the repository to check for any corruption. There were reports of potential threading + * issues leading to repository corruption. This was an attempt to replicate. It should not be run as a + * unit test, really, but will be left, as it can be valuable to exercise the implementation + * + * @throws IOException if unable to read from/write to the write-ahead log + * @throws InterruptedException if a thread is interrupted + */ + @Test + @Ignore + public void tryToCauseThreadingIssue() throws IOException, InterruptedException { + System.setProperty("org.slf4j.simpleLogger.log.org.wali", "INFO"); + + final int numThreads = 12; + final long iterationsPerThread = 1000000; + final int numAttempts = 1000; + + final Path path = Paths.get("D:/dummy/minimal-locking-repo"); + path.toFile().mkdirs(); + + final AtomicReference<WriteAheadRepository<DummyRecord>> writeRepoRef = new AtomicReference<>(); + final AtomicBoolean checkpointing = new AtomicBoolean(false); + + final Thread bgThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + checkpointing.set(true); + + final WriteAheadRepository<DummyRecord> repo = writeRepoRef.get(); + if (repo != null) { + try { + repo.checkpoint(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + checkpointing.set(false); + + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + } + } + } + }); + bgThread.setDaemon(true); + bgThread.start(); + + for (int x = 0; x < numAttempts; x++) { + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null); + final Collection<DummyRecord> writeRecords = writeRepo.recoverRecords(); + for (final DummyRecord record : writeRecords) { + assertEquals("B", record.getProperty("A")); + } + + writeRepoRef.set(writeRepo); + + final Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + final Thread t = new InlineCreationInsertThread(iterationsPerThread, writeRepo); + t.start(); + threads[i] = t; + } + + for (final Thread t : threads) { + t.join(); + } + + writeRepoRef.set(null); + writeRepo.shutdown(); + + boolean cp = checkpointing.get(); + while (cp) { + Thread.sleep(100L); + cp = checkpointing.get(); + } + + final WriteAheadRepository<DummyRecord> readRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null); + // ensure that we are able to recover the records properly + final Collection<DummyRecord> readRecords = readRepo.recoverRecords(); + for (final DummyRecord record : readRecords) { + assertEquals("B", record.getProperty("A")); + } + readRepo.shutdown(); + } + } @Test public void testWrite() throws IOException, InterruptedException { @@ -285,6 +449,40 @@ public class TestMinimalLockingWriteAheadLog { } } + + private static class InlineCreationInsertThread extends Thread { + private final long iterations; + private final WriteAheadRepository<DummyRecord> repo; + + public InlineCreationInsertThread(final long numInsertions, final WriteAheadRepository<DummyRecord> repo) { + this.iterations = numInsertions; + this.repo = repo; + } + + @Override + public void run() { + final List<DummyRecord> list = new ArrayList<>(1); + list.add(null); + final UpdateType[] updateTypes = new UpdateType[] { UpdateType.CREATE, UpdateType.DELETE, UpdateType.UPDATE }; + final Random random = new Random(); + + for (long i = 0; i < iterations; i++) { + final int updateTypeIndex = random.nextInt(updateTypes.length); + final UpdateType updateType = updateTypes[updateTypeIndex]; + + final DummyRecord record = new DummyRecord(String.valueOf(i), updateType); + record.setProperty("A", "B"); + list.set(0, record); + + try { + repo.update(list, false); + } catch (final Throwable t) { + t.printStackTrace(); + } + } + } + } + private void deleteRecursively(final File file) { final File[] children = file.listFiles(); if (children != null) { @@ -295,4 +493,21 @@ public class TestMinimalLockingWriteAheadLog { file.delete(); } + + private long sizeOf(final File file) { + long size = 0L; + if (file.isDirectory()) { + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + size += sizeOf(child); + } + } + } + + size += file.length(); + + return size; + } + }
