This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 40dcd15 NIFI-6410: Addressed race condition in LengthDelimitedJournal
in which a Thread could throw an Exception, then another Thread could update
the Journal before the first thread closes it. Added unit test to replicate.
40dcd15 is described below
commit 40dcd1577b5738f06f480196d74c76c779ac057b
Author: Mark Payne <[email protected]>
AuthorDate: Mon Jul 1 15:02:41 2019 -0400
NIFI-6410: Addressed race condition in LengthDelimitedJournal in which a
Thread could throw an Exception, then another Thread could update the Journal
before the first thread closes it. Added unit test to replicate.
This closes #3561
---
.../apache/nifi/wali/LengthDelimitedJournal.java | 34 +++--
.../nifi/wali/TestLengthDelimitedJournal.java | 166 +++++++++++++++++++++
2 files changed, 188 insertions(+), 12 deletions(-)
diff --git
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
index df7868d..e0dcb63 100644
---
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
+++
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
@@ -301,17 +301,27 @@ public class LengthDelimitedJournal<T> implements
WriteAheadJournal<T> {
final long transactionId;
synchronized (this) {
- transactionId = currentTransactionId++;
- transactionCount++;
-
- transactionPreamble.clear();
- transactionPreamble.putLong(transactionId);
- transactionPreamble.putInt(baos.size());
-
- out.write(TRANSACTION_FOLLOWS);
- out.write(transactionPreamble.array());
- baos.writeTo(out);
- out.flush();
+ checkState();
+
+ try {
+ transactionId = currentTransactionId++;
+ transactionCount++;
+
+ transactionPreamble.clear();
+ transactionPreamble.putLong(transactionId);
+ transactionPreamble.putInt(baos.size());
+
+ out.write(TRANSACTION_FOLLOWS);
+ out.write(transactionPreamble.array());
+ baos.writeTo(out);
+ out.flush();
+ } catch (final Throwable t) {
+ // While the outter Throwable that wraps this "catch" will
call Poison, it is imperative that we call poison()
+ // before the synchronized block is excited. Otherwise,
another thread could potentially corrupt the journal before
+ // the poison method closes the file.
+ poison(t);
+ throw t;
+ }
}
logger.debug("Wrote Transaction {} to journal {} with length {}
and {} records", transactionId, journalFile, baos.size(), records.size());
@@ -343,7 +353,7 @@ public class LengthDelimitedJournal<T> implements
WriteAheadJournal<T> {
}
}
- private void poison(final Throwable t) {
+ protected void poison(final Throwable t) {
this.poisoned = true;
try {
diff --git
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
index 448654e..d54ba9c 100644
---
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
+++
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
@@ -26,6 +26,8 @@ import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -42,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@@ -419,4 +422,167 @@ public class TestLengthDelimitedJournal {
assertEquals(secondRecord, retrieved2);
}
}
+
+ /**
+ * This test is rather complicated and creates a lot of odd objects with
Thread.sleep, etc., and it may not be at-all clear what is happening. The
intent of this
+ * test is to try to cause a race condition to occur that would cause the
journal to become corrupt. Consider the following scenario:
+ *
+ * 1. Thread 1 attempts to update the Journal. In the #update method, it
checks the state of the Journal, which is healthy.
+ * 2. Thread 2 attempts to update the Journal. In the #update method, it
checks the state of the Journal, which is healthy.
+ * 3. Thread 1 now throws an Exception when attempt to write to disk (in
this case, an IOException but could also be an OutOfMemoryError, etc.) The
Exception is caught by the Journal,
+ * and the #poisoni method is called. Before the #poison method is able
to close the underlying Output Stream, Thread 2 is able to write to the Output
Stream (which is no longer guarded
+ * by Thread 1 because of the Exception that was just thrown).
+ * 4. Thread 2 writes to the Journal, after Thread 1 had written only a
partial update. The repository has now become corrupt.
+ * 5. Thread 1 closes the file handle, but does so too late. Corruption
has already occurred!
+ *
+ * In order to replicate the above series of steps in a unit test, we need
to introduce some oddities.
+ *
+ * - The #poison method, when called, needs to wait a bit before actually
closing the underlying FileOutputStream, so that Thread 2 has a chance to run
after Thread 1 stop guarding the Output
+ * Stream and before Thread 1 closes the Output Stream. (This is handled
in the test by subclassing the Journal and overriding the #poison method to
pause).
+ * - We need to ensure that Thread 1 and Thread 2 are both able to pass
the #checkState check in #update before the corruption occurs. (This is handled
in the test by the 'pausingBados' object,
+ * which will enter the #update method, then pause before returning the
ByteArrayOutputStream, which essentially yields to Thread 1, the 'corrupting
thread').
+ * - After both threads have passed the #checkState check, we need Thread
1 to write a partial update, then throw an Exception (This is handled in the
test by the 'corruptingBados' object).
+ * - After the Exception is thrown, we need Thread 2 to update the
contents of the repository before the file is closed. (This is handled in the
test by subclassing the Journal and calling
+ * Thread.sleep in the #poison method).
+ *
+ */
+ @Test
+ public void testFailureDuringWriteCannotCauseCorruption() throws
IOException, InterruptedException {
+ // Create a ByteArrayDataOutputStream such that when attempting to
write the data to another OutputStream via its ByteArrayOutputStream,
+ // the BADOS will copy 5 bytes, then throw an IOException. This should
result in the journal being poisoned such that it can no longer
+ // be written to.
+ final ByteArrayDataOutputStream corruptingBados = new
ByteArrayDataOutputStream(4096) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
+ @Override
+ public synchronized void writeTo(final OutputStream out)
throws IOException {
+ out.write(buf, 0, 5);
+ throw new IOException("Intentional Exception in Unit Test
designed to cause corruption");
+ }
+
+ @Override
+ public synchronized String toString() {
+ return "Corrupting ByteArrayOutputStream[" +
super.toString() + "]";
+ }
+ };
+
+ @Override
+ public ByteArrayOutputStream getByteArrayOutputStream() {
+ return baos;
+ }
+
+ @Override
+ public DataOutputStream getDataOutputStream() {
+ return new DataOutputStream(baos);
+ }
+ };
+
+ // Create a ByteArrayDataOutputStream such that when attempting to
write the data to another OutputStream via its ByteArrayOutputStream,
+ // the BADOS will sleep for 1 second before writing. This allwos other
threads to trigger corruption in the repo in the meantime.
+ final ByteArrayDataOutputStream pausingBados = new
ByteArrayDataOutputStream(4096) {
+ private final ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ private int count = 0;
+
+ @Override
+ public ByteArrayOutputStream getByteArrayOutputStream() {
+ // Pause only on the second iteration.
+ if (count++ == 1) {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+
+ return baos;
+ }
+
+ @Override
+ public DataOutputStream getDataOutputStream() {
+ return new DataOutputStream(baos);
+ }
+ };
+
+
+ final Supplier<ByteArrayDataOutputStream> badosSupplier = new
Supplier<ByteArrayDataOutputStream>() {
+ private int count = 0;
+
+ @Override
+ public ByteArrayDataOutputStream get() {
+ if (count++ == 0) {
+ return pausingBados;
+ }
+
+ return corruptingBados;
+ }
+ };
+
+
+ final ObjectPool<ByteArrayDataOutputStream> corruptingStreamPool = new
BlockingQueuePool<>(2,
+ badosSupplier,
+ stream -> true,
+ stream -> stream.getByteArrayOutputStream().reset());
+
+
+ final Thread[] threads = new Thread[2];
+
+ final LengthDelimitedJournal<DummyRecord> journal = new
LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory,
corruptingStreamPool, 0L) {
+ private int count = 0;
+
+ @Override
+ protected void poison(final Throwable t) {
+ if (count++ == 0) { // it is only important that we sleep the
first time. If we sleep every time, it just slows the test down.
+ try {
+ Thread.sleep(3000L);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ super.poison(t);
+ }
+ };
+
+
+ final DummyRecord firstRecord, secondRecord;
+ try {
+ journal.writeHeader();
+
+ firstRecord = new DummyRecord("1", UpdateType.CREATE);
+ secondRecord = new DummyRecord("2", UpdateType.CREATE);
+
+ final Thread t1 = new Thread(() -> {
+ try {
+ journal.update(Collections.singleton(firstRecord), key ->
null);
+ } catch (final IOException ioe) {
+ }
+ });
+
+
+ final Thread t2 = new Thread(() -> {
+ try {
+ journal.update(Collections.singleton(secondRecord), key ->
firstRecord);
+ } catch (final IOException ioe) {
+ }
+ });
+
+ threads[0] = t1;
+ threads[1] = t2;
+
+ t1.start();
+ t2.start();
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ } finally {
+ journal.close();
+ }
+
+ // Now, attempt to read from the Journal to ensure that it is not
corrupt.
+ try (final LengthDelimitedJournal<DummyRecord> recoveryJournal = new
LengthDelimitedJournal<>(journalFile, serdeFactory, corruptingStreamPool, 0L)) {
+ final Map<Object, DummyRecord> recordMap = new HashMap<>();
+ final Set<String> swapLocations = new HashSet<>();
+
+ recoveryJournal.recoverRecords(recordMap, swapLocations);
+ assertEquals(0, recordMap.size());
+ }
+ }
}