This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 58a25cf  NIFI-6110: Updated StandardProcessSession such that if we 
fail to update the FlowFile Repository, we do not decrement claimant counts for 
any FlowFiles that were removed. Doing so can cause an issue where a FlowFile 
is removed, then the FlowFileRepo update fails, resulting in the flowfile being 
rolled back, but after its claimant count is decremented. It will then be 
processed again, which can result in the same thing, and we'll end up 
decrementing the claimant count rep [...]
58a25cf is described below

commit 58a25cfa5a9b9e4af9a5e9b7e5c3be9a676d8e87
Author: Mark Payne <[email protected]>
AuthorDate: Fri Mar 8 08:55:53 2019 -0500

    NIFI-6110: Updated StandardProcessSession such that if we fail to update 
the FlowFile Repository, we do not decrement claimant counts for any FlowFiles 
that were removed. Doing so can cause an issue where a FlowFile is removed, 
then the FlowFileRepo update fails, resulting in the flowfile being rolled 
back, but after its claimant count is decremented. It will then be processed 
again, which can result in the same thing, and we'll end up decrementing the 
claimant count repeatedly. Also  [...]
    
    This closes #3358.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../apache/nifi/wali/LengthDelimitedJournal.java   |  8 +-
 .../nifi/wali/TestLengthDelimitedJournal.java      | 93 +++++++++++++++++++---
 .../src/test/java/org/wali/DummyRecordSerde.java   | 14 ++--
 .../repository/StandardProcessSession.java         | 38 ++++-----
 .../repository/TestStandardProcessSession.java     | 55 +++++++++++++
 5 files changed, 170 insertions(+), 38 deletions(-)

diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
index d9fdc97..df7868d 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
@@ -40,6 +40,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.text.DecimalFormat;
 import java.util.Collection;
 import java.util.HashMap;
@@ -218,6 +219,11 @@ public class LengthDelimitedJournal<T> implements 
WriteAheadJournal<T> {
     }
 
 
+    // Visible/overrideable for testing.
+    protected void createOverflowDirectory(final Path path) throws IOException 
{
+        Files.createDirectories(path);
+    }
+
     @Override
     public void update(final Collection<T> records, final RecordLookup<T> 
recordLookup) throws IOException {
         if (!headerWritten) {
@@ -246,7 +252,7 @@ public class LengthDelimitedJournal<T> implements 
WriteAheadJournal<T> {
                     final int size = bados.getByteArrayOutputStream().size();
                     if (serde.isWriteExternalFileReferenceSupported() && size 
> maxInHeapSerializationBytes) {
                         if (!overflowDirectory.exists()) {
-                            Files.createDirectory(overflowDirectory.toPath());
+                            
createOverflowDirectory(overflowDirectory.toPath());
                         }
 
                         // If we have exceeded our threshold for how much to 
serialize in memory,
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
index 94df890..448654e 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java
@@ -17,16 +17,21 @@
 
 package org.apache.nifi.wali;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.wali.DummyRecord;
+import org.wali.DummyRecordSerde;
+import org.wali.SerDeFactory;
+import org.wali.SingletonSerDeFactory;
+import org.wali.UpdateType;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -36,15 +41,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.wali.DummyRecord;
-import org.wali.DummyRecordSerde;
-import org.wali.SerDeFactory;
-import org.wali.SingletonSerDeFactory;
-import org.wali.UpdateType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestLengthDelimitedJournal {
     private final File journalFile = new 
File("target/testLengthDelimitedJournal/testJournal.journal");
@@ -308,6 +312,71 @@ public class TestLengthDelimitedJournal {
     }
 
     @Test
+    public void testMultipleThreadsCreatingOverflowDirectory() throws 
IOException, InterruptedException {
+        final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, streamPool, 
3820L, 100) {
+            @Override
+            protected void createOverflowDirectory(final Path path) throws 
IOException {
+                // Create the overflow directory.
+                super.createOverflowDirectory(path);
+
+                // Ensure that a second call to create the overflow directory 
will not cause an issue.
+                super.createOverflowDirectory(path);
+            }
+        };
+
+        // Ensure that the overflow directory does not exist.
+        journal.dispose();
+
+        try {
+            journal.writeHeader();
+
+            final List<DummyRecord> largeCollection1 = new ArrayList<>();
+            for (int i=0; i < 1_000; i++) {
+                largeCollection1.add(new DummyRecord(String.valueOf(i), 
UpdateType.CREATE));
+            }
+            final Map<String, DummyRecord> recordMap = 
largeCollection1.stream()
+                .collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
+
+            final List<DummyRecord> largeCollection2 = new ArrayList<>();
+            for (int i=0; i < 1_000; i++) {
+                largeCollection2.add(new DummyRecord(String.valueOf(5_000_000 
+ i), UpdateType.CREATE));
+            }
+            final Map<String, DummyRecord> recordMap2 = 
largeCollection2.stream()
+                .collect(Collectors.toMap(DummyRecord::getId, rec -> rec));
+
+            final AtomicReference<Exception> thread1Failure = new 
AtomicReference<>();
+            final Thread t1 = new Thread(() -> {
+                try {
+                    journal.update(largeCollection1, recordMap::get);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    thread1Failure.set(e);
+                }
+            });
+            t1.start();
+
+            final AtomicReference<Exception> thread2Failure = new 
AtomicReference<>();
+            final Thread t2 = new Thread(() -> {
+                try {
+                    journal.update(largeCollection2, recordMap2::get);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    thread2Failure.set(e);
+                }
+            });
+            t2.start();
+
+            t1.join();
+            t2.join();
+
+            assertNull(thread1Failure.get());
+            assertNull(thread2Failure.get());
+        } finally {
+            journal.close();
+        }
+    }
+
+    @Test
     public void testTruncatedJournalFile() throws IOException {
         final DummyRecord firstRecord, secondRecord;
         try (final LengthDelimitedJournal<DummyRecord> journal = new 
LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
index 9203493..cc8a4c1 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
@@ -43,7 +43,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
 
     @SuppressWarnings("fallthrough")
     @Override
-    public void serializeEdit(final DummyRecord previousState, final 
DummyRecord record, final DataOutputStream out) throws IOException {
+    public synchronized void serializeEdit(final DummyRecord previousState, 
final DummyRecord record, final DataOutputStream out) throws IOException {
         if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= 
throwIOEAfterNserializeEdits)) {
             throw new IOException("Serialized " + (serializeEditCount - 1) + " 
records successfully, so now it's time to throw IOE");
         }
@@ -80,13 +80,13 @@ public class DummyRecordSerde implements SerDe<DummyRecord> 
{
     }
 
     @Override
-    public void serializeRecord(final DummyRecord record, final 
DataOutputStream out) throws IOException {
+    public synchronized void serializeRecord(final DummyRecord record, final 
DataOutputStream out) throws IOException {
         serializeEdit(null, record, out);
     }
 
     @Override
     @SuppressWarnings("fallthrough")
-    public DummyRecord deserializeRecord(final DataInputStream in, final int 
version) throws IOException {
+    public synchronized DummyRecord deserializeRecord(final DataInputStream 
in, final int version) throws IOException {
         if (externalRecords != null) {
             final DummyRecord record = externalRecords.poll();
             if (record != null) {
@@ -122,7 +122,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> 
{
     }
 
     @Override
-    public boolean isMoreInExternalFile() {
+    public synchronized boolean isMoreInExternalFile() {
         return externalRecords != null && !externalRecords.isEmpty();
     }
 
@@ -189,11 +189,11 @@ public class DummyRecordSerde implements 
SerDe<DummyRecord> {
         return 1;
     }
 
-    public void setThrowIOEAfterNSerializeEdits(final int n) {
+    public synchronized void setThrowIOEAfterNSerializeEdits(final int n) {
         this.throwIOEAfterNserializeEdits = n;
     }
 
-    public void setThrowOOMEAfterNSerializeEdits(final int n) {
+    public synchronized void setThrowOOMEAfterNSerializeEdits(final int n) {
         this.throwOOMEAfterNserializeEdits = n;
     }
 
@@ -208,7 +208,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> 
{
     }
 
     @Override
-    public void writeExternalFileReference(final File externalFile, final 
DataOutputStream out) throws IOException {
+    public synchronized void writeExternalFileReference(final File 
externalFile, final DataOutputStream out) throws IOException {
         out.write(EXTERNAL_FILE_INDICATOR);
         out.writeUTF(externalFile.getAbsolutePath());
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 604eb7e..7a87678 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -359,8 +359,25 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             final long updateProvenanceStart = System.nanoTime();
             updateProvenanceRepo(checkpoint);
 
-            final long claimRemovalStart = System.nanoTime();
-            final long updateProvenanceNanos = claimRemovalStart - 
updateProvenanceStart;
+            final long flowFileRepoUpdateStart = System.nanoTime();
+            final long updateProvenanceNanos = flowFileRepoUpdateStart - 
updateProvenanceStart;
+
+            // Update the FlowFile Repository
+            try {
+                final Collection<StandardRepositoryRecord> repoRecords = 
checkpoint.records.values();
+                context.getFlowFileRepository().updateRepository((Collection) 
repoRecords);
+            } catch (final IOException ioe) {
+                // if we fail to commit the session, we need to roll back
+                // the checkpoints as well because none of the checkpoints
+                // were ever committed.
+                rollback(false, true);
+                throw new ProcessException("FlowFile Repository failed to 
update", ioe);
+            }
+
+            final long flowFileRepoUpdateFinishNanos = System.nanoTime();
+            final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos 
- flowFileRepoUpdateStart;
+
+            final long claimRemovalStart = flowFileRepoUpdateFinishNanos;
 
             /**
              * Figure out which content claims can be released. At this point,
@@ -401,25 +418,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             final long claimRemovalFinishNanos = System.nanoTime();
             final long claimRemovalNanos = claimRemovalFinishNanos - 
claimRemovalStart;
 
-            // Update the FlowFile Repository
-            try {
-                final Collection<StandardRepositoryRecord> repoRecords = 
checkpoint.records.values();
-                context.getFlowFileRepository().updateRepository((Collection) 
repoRecords);
-            } catch (final IOException ioe) {
-                // if we fail to commit the session, we need to roll back
-                // the checkpoints as well because none of the checkpoints
-                // were ever committed.
-                rollback(false, true);
-                throw new ProcessException("FlowFile Repository failed to 
update", ioe);
-            }
-
-            final long flowFileRepoUpdateFinishNanos = System.nanoTime();
-            final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos 
- claimRemovalFinishNanos;
-
             updateEventRepository(checkpoint);
 
             final long updateEventRepositoryFinishNanos = System.nanoTime();
-            final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos;
+            final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
 
             // transfer the flowfiles to the connections' queues.
             final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = 
new HashMap<>();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index d21cc5c..b001e79 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -92,6 +92,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.notNull;
@@ -325,6 +326,60 @@ public class TestStandardProcessSession {
 
 
     @Test
+    public void testUpdateFlowFileRepoFailsOnSessionCommit() throws 
IOException {
+        final ContentClaim contentClaim = 
contentRepo.create("original".getBytes());
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(8L)
+            .contentClaim(contentClaim)
+            .build();
+
+        flowFileQueue.put(flowFileRecord);
+
+        final Relationship relationship = new 
Relationship.Builder().name("A").build();
+
+        FlowFile ff1 = session.get();
+        assertNotNull(ff1);
+
+        // Fork a child FlowFile.
+        final FlowFile child = session.create(flowFileRecord);
+        final FlowFile updated = session.write(flowFileRecord, out -> 
out.write("update".getBytes()));
+        final ContentClaim updatedContentClaim = ((FlowFileRecord) 
updated).getContentClaim();
+
+        session.remove(updated);
+        final FlowFile updatedChild = session.write(child, out -> 
out.write("hello".getBytes(StandardCharsets.UTF_8)));
+        session.transfer(updatedChild, relationship);
+
+        final ContentClaim childContentClaim = ((FlowFileRecord) 
updatedChild).getContentClaim();
+
+        flowFileRepo.setFailOnUpdate(true);
+
+        assertEquals(1, contentRepo.getClaimantCount(contentClaim));
+
+        // these will be the same content claim due to how the 
StandardProcessSession adds multiple FlowFiles' contents to a single claim.
+        assertSame(updatedContentClaim, childContentClaim);
+        assertEquals(2, contentRepo.getClaimantCount(childContentClaim));
+
+        try {
+            session.commit();
+            Assert.fail("Expected session commit to fail");
+        } catch (final ProcessException pe) {
+            // Expected
+        }
+
+        // Ensure that if we fail to update teh flowfile repo, that the 
claimant count of the 'original' flowfile, which was removed, does not get 
decremented.
+        assertEquals(1, contentRepo.getClaimantCount(contentClaim));
+        assertEquals(0, contentRepo.getClaimantCount(updatedContentClaim)); // 
temporary claim should be cleaned up.
+        assertEquals(0, contentRepo.getClaimantCount(childContentClaim)); // 
temporary claim should be cleaned up.
+
+        assertEquals(1, flowFileQueue.size().getObjectCount());
+        assertEquals(8L, flowFileQueue.size().getByteCount());
+    }
+
+    @Test
     public void testCloneOriginalDataSmaller() throws IOException {
         final byte[] originalContent = "hello".getBytes();
         final byte[] replacementContent = "NEW DATA".getBytes();

Reply via email to