This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new efc1cb0  NIFI-8727: This closes #5418. Addressed bug in which 
ProcessSession doesn't properly decrement claimant count when a FlowFile is 
cloned and then the clone written to. Added automated tests to ensure that we 
are properly handling cases where a FlowFile is clone and then the contents 
modified
efc1cb0 is described below

commit efc1cb012f95a625e7ae64a56dd733912e53c4a8
Author: Mark Payne <[email protected]>
AuthorDate: Tue Sep 28 13:23:44 2021 -0400

    NIFI-8727: This closes #5418. Addressed bug in which ProcessSession doesn't 
properly decrement claimant count when a FlowFile is cloned and then the clone 
written to. Added automated tests to ensure that we are properly handling cases 
where a FlowFile is clone and then the contents modified
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../repository/StandardProcessSession.java         |  11 +-
 .../repository/StandardProcessSessionIT.java       | 203 ++++++++++++++++++++-
 2 files changed, 210 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a17c06c..eba58ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -3075,7 +3075,7 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
         // If the content of the FlowFile has already been modified, we need 
to remove the newly created content (the working claim). However, if
         // they are the same, we cannot just remove the claim because 
record.getWorkingClaim() will return
         // the original claim if the record is "working" but the content has 
not been modified
-        // (e.g., in the case of attributes only were updated)
+        // (e.g., in the case of attributes only were updated).
         //
         // In other words:
         // If we modify the attributes of a FlowFile, and then we call 
record.getWorkingClaim(), this will
@@ -3083,7 +3083,14 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
         // that may decrement the original claim (because the 2 claims are the 
same), and that's NOT what we want to do
         // because we will do that later, in the session.commit() and that 
would result in decrementing the count for
         // the original claim twice.
-        if (record.isContentModified()) {
+        //
+        // Additionally, If the Record Type is CREATE, that means any content 
that exists is a temporary claim.
+        // This will happen, for example, if a FlowFile is created and then 
written to multiple times or a FlowFile is cloned and then the clone's contents
+        // are overwritten. In such a case, we can confidently decrement the 
count/remove the claim because either the claim is null (in which case calling
+        // decrementClaimantCount/addTransientClaim will have no effect, or 
the claim points to the previously written content that is no longer relevant 
because
+        // the FlowFile's content is being overwritten.
+
+        if (record.isContentModified() || record.getType() == 
RepositoryRecordType.CREATE) {
             // In this case, it's ok to decrement the claimant count for the 
content because we know that the working claim is going to be
             // updated and the given working claim is referenced only by 
FlowFiles in this session (because it's the Working Claim).
             // Therefore, we need to decrement the claimant count, and since 
the Working Claim is being changed, that means that
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 15f9d46..4e2e145 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -111,6 +111,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class StandardProcessSessionIT {
+    private static final Relationship FAKE_RELATIONSHIP = new 
Relationship.Builder().name("FAKE").build();
 
     private StandardProcessSession session;
     private MockContentRepository contentRepo;
@@ -123,8 +124,7 @@ public class StandardProcessSessionIT {
     private MockFlowFileRepository flowFileRepo;
     private CounterRepository counterRepository;
     private FlowFileEventRepository flowFileEventRepository;
-    private final Relationship FAKE_RELATIONSHIP = new 
Relationship.Builder().name("FAKE").build();
-    private static StandardResourceClaimManager resourceClaimManager;
+    private ResourceClaimManager resourceClaimManager;
 
     @After
     public void cleanup() {
@@ -2603,6 +2603,205 @@ public class StandardProcessSessionIT {
         stateManager.assertStateNotSet();
     }
 
+    @Test
+    public void testCloneThenRollbackCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+    }
+
+    @Test
+    public void testCloneThenWriteThenRollbackCountsClaimReferencesProperly() 
throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.write(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenAppendThenRollbackCountsClaimReferencesProperly() 
throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.append(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenWriteCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+
+        // Expect claimant count of 2 because the clone() means that the new 
FlowFile points to the same content claim.
+        assertEquals(2, contentRepo.getClaimantCount(originalClaim));
+
+        // Should be able to write to the FlowFile any number of times, and 
each time it should leave us with a Content Claim Claimant Count of 1 for the 
original (because the new FlowFile will no
+        // longer point at the original claim) and 1 for the new Content Claim.
+        for (int i=0; i < 10; i++) {
+            final ContentClaim previousCloneClaim = getContentClaim(clone);
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            // After modifying the content of the FlowFile, the claimant count 
of the 'old' content claim should be 1, as should the claimant count of the 
updated content claim.
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+            assertEquals(1, contentRepo.getClaimantCount(previousCloneClaim));
+        }
+    }
+
+    private ContentClaim getContentClaim(final FlowFile flowFile) {
+        return ((FlowFileRecord) flowFile).getContentClaim();
+    }
+
+    @Test
+    public void testCreateChildThenWriteCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+        final ContentClaim updatedCloneClaim = getContentClaim(clone);
+        assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+    }
+
+    @Test
+    public void 
testCreateChildThenMultipleWriteCountsClaimReferencesProperly() throws 
IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        for (int i=0; i < 100; i++) {
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(claim));
+        }
+    }
+
+    @Test
+    public void 
testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimReferencesProperly()
 {
+        FlowFile flowFile = session.create();
+
+        for (int i=0; i < 100; i++) {
+            flowFile = session.write(flowFile, out -> 
out.write("bye".getBytes()));
+
+            final ContentClaim updatedCloneClaim = getContentClaim(flowFile);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+        }
+
+        session.rollback();
+        assertEquals(0, 
contentRepo.getClaimantCount(getContentClaim(flowFile)));
+    }
+
+
+
     private static class MockFlowFileRepository implements FlowFileRepository {
 
         private boolean failOnUpdate = false;

Reply via email to