This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 40dcd15  NIFI-6410: Addressed race condition in LengthDelimitedJournal 
in which a Thread could throw an Exception, then another Thread could update 
the Journal before the first thread closes it. Added unit test to replicate.
40dcd15 is described below

commit 40dcd1577b5738f06f480196d74c76c779ac057b
Author: Mark Payne <[email protected]>
AuthorDate: Mon Jul 1 15:02:41 2019 -0400

    NIFI-6410: Addressed race condition in LengthDelimitedJournal in which a 
Thread could throw an Exception, then another Thread could update the Journal 
before the first thread closes it. Added unit test to replicate.
    
    This closes #3561
---
 .../apache/nifi/wali/LengthDelimitedJournal.java   |  34 +++--
 .../nifi/wali/TestLengthDelimitedJournal.java      | 166 +++++++++++++++++++++
 2 files changed, 188 insertions(+), 12 deletions(-)

diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
index df7868d..e0dcb63 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
@@ -301,17 +301,27 @@ public class LengthDelimitedJournal<T> implements 
WriteAheadJournal<T> {
 
             final long transactionId;
             synchronized (this) {
-                transactionId = currentTransactionId++;
-                transactionCount++;
-
-                transactionPreamble.clear();
-                transactionPreamble.putLong(transactionId);
-                transactionPreamble.putInt(baos.size());
-
-                out.write(TRANSACTION_FOLLOWS);
-                out.write(transactionPreamble.array());
-                baos.writeTo(out);
-                out.flush();
+                checkState();
+
+                try {
+                    transactionId = currentTransactionId++;
+                    transactionCount++;
+
+                    transactionPreamble.clear();
+                    transactionPreamble.putLong(transactionId);
+                    transactionPreamble.putInt(baos.size());
+
+                    out.write(TRANSACTION_FOLLOWS);
+                    out.write(transactionPreamble.array());
+                    baos.writeTo(out);
+                    out.flush();
+                } catch (final Throwable t) {
+                    // While the outter Throwable that wraps this "catch" will 
call Poison, it is imperative that we call poison()
+                    // before the synchronized block is excited. Otherwise, 
another thread could potentially corrupt the journal before
+                    // the poison method closes the file.
+                    poison(t);
+                    throw t;
+                }
             }
 
             logger.debug("Wrote Transaction {} to journal {} with length {} 
and {} records", transactionId, journalFile, baos.size(), records.size());
@@ -343,7 +353,7 @@ public class LengthDelimitedJournal<T> implements 
WriteAheadJournal<T> {
         }
     }
 
-    private void poison(final Throwable t) {
+    protected void poison(final Throwable t) {
         this.poisoned = true;
 
         try {
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
index 448654e..d54ba9c 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
@@ -26,6 +26,8 @@ import org.wali.SerDeFactory;
 import org.wali.SingletonSerDeFactory;
 import org.wali.UpdateType;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -42,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
@@ -419,4 +422,167 @@ public class TestLengthDelimitedJournal {
             assertEquals(secondRecord, retrieved2);
         }
     }
+
+    /**
+     * This test is rather complicated and creates a lot of odd objects with 
Thread.sleep, etc., and it may not be at-all clear what is happening. The 
intent of this
+     * test is to try to cause a race condition to occur that would cause the 
journal to become corrupt. Consider the following scenario:
+     *
+     * 1. Thread 1 attempts to update the Journal. In the #update method, it 
checks the state of the Journal, which is healthy.
+     * 2. Thread 2 attempts to update the Journal. In the #update method, it 
checks the state of the Journal, which is healthy.
+     * 3. Thread 1 now throws an Exception when attempt to write to disk (in 
this case, an IOException but could also be an OutOfMemoryError, etc.) The 
Exception is caught by the Journal,
+     *    and the #poisoni method is called. Before the #poison method is able 
to close the underlying Output Stream, Thread 2 is able to write to the Output 
Stream (which is no longer guarded
+     *    by Thread 1 because of the Exception that was just thrown).
+     * 4. Thread 2 writes to the Journal, after Thread 1 had written only a 
partial update. The repository has now become corrupt.
+     * 5. Thread 1 closes the file handle, but does so too late. Corruption 
has already occurred!
+     *
+     * In order to replicate the above series of steps in a unit test, we need 
to introduce some oddities.
+     *
+     * - The #poison method, when called, needs to wait a bit before actually 
closing the underlying FileOutputStream, so that Thread 2 has a chance to run 
after Thread 1 stop guarding the Output
+     * Stream and before Thread 1 closes the Output Stream. (This is handled 
in the test by subclassing the Journal and overriding the #poison method to 
pause).
+     * - We need to ensure that Thread 1 and Thread 2 are both able to pass 
the #checkState check in #update before the corruption occurs. (This is handled 
in the test by the 'pausingBados' object,
+     * which will enter the #update method, then pause before returning the 
ByteArrayOutputStream, which essentially yields to Thread 1, the 'corrupting 
thread').
+     * - After both threads have passed the #checkState check, we need Thread 
1 to write a partial update, then throw an Exception (This is handled in the 
test by the 'corruptingBados' object).
+     * - After the Exception is thrown, we need Thread 2 to update the 
contents of the repository before the file is closed. (This is handled in the 
test by subclassing the Journal and calling
+     * Thread.sleep in the #poison method).
+     *
+     */
+    @Test
+    public void testFailureDuringWriteCannotCauseCorruption() throws 
IOException, InterruptedException {
+        // Create a ByteArrayDataOutputStream such that when attempting to 
write the data to another OutputStream via its ByteArrayOutputStream,
+        // the BADOS will copy 5 bytes, then throw an IOException. This should 
result in the journal being poisoned such that it can no longer
+        // be written to.
+        final ByteArrayDataOutputStream corruptingBados = new 
ByteArrayDataOutputStream(4096) {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
+                @Override
+                public synchronized void writeTo(final OutputStream out) 
throws IOException {
+                    out.write(buf, 0, 5);
+                    throw new IOException("Intentional Exception in Unit Test 
designed to cause corruption");
+                }
+
+                @Override
+                public synchronized String toString() {
+                    return "Corrupting ByteArrayOutputStream[" + 
super.toString() + "]";
+                }
+            };
+
+            @Override
+            public ByteArrayOutputStream getByteArrayOutputStream() {
+                return baos;
+            }
+
+            @Override
+            public DataOutputStream getDataOutputStream() {
+                return new DataOutputStream(baos);
+            }
+        };
+
+        // Create a ByteArrayDataOutputStream such that when attempting to 
write the data to another OutputStream via its ByteArrayOutputStream,
+        // the BADOS will sleep for 1 second before writing. This allwos other 
threads to trigger corruption in the repo in the meantime.
+        final ByteArrayDataOutputStream pausingBados = new 
ByteArrayDataOutputStream(4096) {
+            private final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+            private int count = 0;
+
+            @Override
+            public ByteArrayOutputStream getByteArrayOutputStream() {
+                // Pause only on the second iteration.
+                if (count++ == 1) {
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {
+                    }
+                }
+
+                return baos;
+            }
+
+            @Override
+            public DataOutputStream getDataOutputStream() {
+                return new DataOutputStream(baos);
+            }
+        };
+
+
+        final Supplier<ByteArrayDataOutputStream> badosSupplier = new 
Supplier<ByteArrayDataOutputStream>() {
+            private int count = 0;
+
+            @Override
+            public ByteArrayDataOutputStream get() {
+                if (count++ == 0) {
+                    return pausingBados;
+                }
+
+                return corruptingBados;
+            }
+        };
+
+
+        final ObjectPool<ByteArrayDataOutputStream> corruptingStreamPool = new 
BlockingQueuePool<>(2,
+            badosSupplier,
+            stream -> true,
+            stream -> stream.getByteArrayOutputStream().reset());
+
+
+        final Thread[] threads = new Thread[2];
+
+        final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, 
corruptingStreamPool, 0L) {
+            private int count = 0;
+
+            @Override
+            protected void poison(final Throwable t)  {
+                if (count++ == 0) { // it is only important that we sleep the 
first time. If we sleep every time, it just slows the test down.
+                    try {
+                        Thread.sleep(3000L);
+                    } catch (InterruptedException e) {
+                    }
+                }
+
+                super.poison(t);
+            }
+        };
+
+
+        final DummyRecord firstRecord, secondRecord;
+        try {
+            journal.writeHeader();
+
+            firstRecord = new DummyRecord("1", UpdateType.CREATE);
+            secondRecord = new DummyRecord("2", UpdateType.CREATE);
+
+            final Thread t1 = new Thread(() -> {
+                try {
+                    journal.update(Collections.singleton(firstRecord), key -> 
null);
+                } catch (final IOException ioe) {
+                }
+            });
+
+
+            final Thread t2 = new Thread(() -> {
+                try {
+                    journal.update(Collections.singleton(secondRecord), key -> 
firstRecord);
+                } catch (final IOException ioe) {
+                }
+            });
+
+            threads[0] = t1;
+            threads[1] = t2;
+
+            t1.start();
+            t2.start();
+
+            for (Thread thread : threads) {
+                thread.join();
+            }
+        } finally {
+            journal.close();
+        }
+
+        // Now, attempt to read from the Journal to ensure that it is not 
corrupt.
+        try (final LengthDelimitedJournal<DummyRecord> recoveryJournal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, corruptingStreamPool, 0L)) {
+            final Map<Object, DummyRecord> recordMap = new HashMap<>();
+            final Set<String> swapLocations = new HashSet<>();
+
+            recoveryJournal.recoverRecords(recordMap, swapLocations);
+            assertEquals(0, recordMap.size());
+        }
+    }
 }

Reply via email to