This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 6929ddecb6 [NIFI-12067] mock process session keeps track of flowfiles 
created during the session and removes them on rollback rather than putting 
them on the input queue (#7827)
6929ddecb6 is described below

commit 6929ddecb62b247451fc12ca7a82ca33dea7628e
Author: Eric Secules <esecu...@gmail.com>
AuthorDate: Wed Oct 4 07:48:50 2023 -0700

    [NIFI-12067] mock process session keeps track of flowfiles created during 
the session and removes them on rollback rather than putting them on the input 
queue (#7827)
    
    rename method to be more descriptive, fix checkstyle error for trailing 
whitespace in TestMockProcessSession.java
    added session.transfer call to unit tests so that they fail without the 
fixes.
    
    Co-authored-by: Eric Secules <eric.secu...@macrohealth.com>
---
 .../org/apache/nifi/util/MockProcessSession.java   | 48 ++++++++++++++--------
 .../apache/nifi/util/TestMockProcessSession.java   | 34 +++++++++++++++
 2 files changed, 66 insertions(+), 16 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index ce0d93094e..c408c44e5b 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -69,6 +69,7 @@ public class MockProcessSession implements ProcessSession {
     private final Map<Relationship, List<MockFlowFile>> transferMap = new 
ConcurrentHashMap<>();
     private final MockFlowFileQueue processorQueue;
     private final Set<Long> beingProcessed = new HashSet<>();
+    private final Set<Long> created = new HashSet<>();
     private final List<MockFlowFile> penalized = new ArrayList<>();
     private final Processor processor;
 
@@ -213,6 +214,10 @@ public class MockProcessSession implements ProcessSession {
             if (removedFlowFiles.remove(flowFile.getId())) {
                 newOwner.removedFlowFiles.add(flowFile.getId());
             }
+
+            if (created.remove(flowFile.getId())) {
+                newOwner.created.add(flowFile.getId());
+            }
         }
 
         final Set<String> flowFileIds = flowFiles.stream()
@@ -226,8 +231,7 @@ public class MockProcessSession implements ProcessSession {
     public MockFlowFile clone(FlowFile flowFile) {
         flowFile = validateState(flowFile);
         final MockFlowFile newFlowFile = new 
MockFlowFile(sharedState.nextFlowFileId(), flowFile);
-        currentVersions.put(newFlowFile.getId(), newFlowFile);
-        beingProcessed.add(newFlowFile.getId());
+        updateStateWithNewFlowFile(newFlowFile);
         return newFlowFile;
     }
 
@@ -242,8 +246,7 @@ public class MockProcessSession implements ProcessSession {
         final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) 
flowFile).getData(), (int) offset, (int) (offset + size));
         newFlowFile.setData(newContent);
 
-        currentVersions.put(newFlowFile.getId(), newFlowFile);
-        beingProcessed.add(newFlowFile.getId());
+        updateStateWithNewFlowFile(newFlowFile);
         return newFlowFile;
     }
 
@@ -290,6 +293,7 @@ public class MockProcessSession implements ProcessSession {
         beingProcessed.clear();
         currentVersions.clear();
         originalVersions.clear();
+        created.clear();
 
         for (final Map.Entry<String, Long> entry : counterMap.entrySet()) {
             sharedState.adjustCounter(entry.getKey(), entry.getValue());
@@ -339,8 +343,7 @@ public class MockProcessSession implements ProcessSession {
     @Override
     public MockFlowFile create() {
         final MockFlowFile flowFile = new 
MockFlowFile(sharedState.nextFlowFileId());
-        currentVersions.put(flowFile.getId(), flowFile);
-        beingProcessed.add(flowFile.getId());
+        updateStateWithNewFlowFile(flowFile);
         return flowFile;
     }
 
@@ -348,8 +351,7 @@ public class MockProcessSession implements ProcessSession {
     public MockFlowFile create(final FlowFile flowFile) {
         MockFlowFile newFlowFile = create();
         newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile);
-        currentVersions.put(newFlowFile.getId(), newFlowFile);
-        beingProcessed.add(newFlowFile.getId());
+        updateStateWithNewFlowFile(newFlowFile);
         return newFlowFile;
     }
 
@@ -357,8 +359,7 @@ public class MockProcessSession implements ProcessSession {
     public MockFlowFile create(final Collection<FlowFile> flowFiles) {
         MockFlowFile newFlowFile = create();
         newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile);
-        currentVersions.put(newFlowFile.getId(), newFlowFile);
-        beingProcessed.add(newFlowFile.getId());
+        updateStateWithNewFlowFile(newFlowFile);
         return newFlowFile;
     }
 
@@ -802,9 +803,11 @@ public class MockProcessSession implements ProcessSession {
 
         for (final List<MockFlowFile> list : transferMap.values()) {
             for (final MockFlowFile flowFile : list) {
-                processorQueue.offer(flowFile);
-                if (penalize) {
-                    penalized.add(flowFile);
+                if (!created.contains(flowFile.getId())) {
+                    processorQueue.offer(flowFile);
+                    if (penalize) {
+                        penalized.add(flowFile);
+                    }
                 }
             }
         }
@@ -812,9 +815,11 @@ public class MockProcessSession implements ProcessSession {
         for (final Long flowFileId : beingProcessed) {
             final MockFlowFile flowFile = originalVersions.get(flowFileId);
             if (flowFile != null) {
-                processorQueue.offer(flowFile);
-                if (penalize) {
-                    penalized.add(flowFile);
+                if (!created.contains(flowFile.getId())) {
+                    processorQueue.offer(flowFile);
+                    if (penalize) {
+                        penalized.add(flowFile);
+                    }
                 }
             }
         }
@@ -824,6 +829,7 @@ public class MockProcessSession implements ProcessSession {
         currentVersions.clear();
         originalVersions.clear();
         transferMap.clear();
+        created.clear();
         clearTransferState();
         if (!penalize) {
             penalized.clear();
@@ -858,6 +864,15 @@ public class MockProcessSession implements ProcessSession {
         mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet());
     }
 
+    private void updateStateWithNewFlowFile(MockFlowFile newFlowFile) {
+        if (newFlowFile == null) {
+            throw new IllegalArgumentException("argument cannot be null");
+        }
+        currentVersions.put(newFlowFile.getId(), newFlowFile);
+        beingProcessed.add(newFlowFile.getId());
+        created.add(newFlowFile.getId());
+    }
+
     @Override
     public void transfer(final Collection<FlowFile> flowFiles) {
         for (final FlowFile flowFile : flowFiles) {
@@ -1072,6 +1087,7 @@ public class MockProcessSession implements ProcessSession 
{
         final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), 
destination);
         newFlowFile.setData(baos.toByteArray());
         currentVersions.put(newFlowFile.getId(), newFlowFile);
+        created.add(newFlowFile.getId());
 
         return newFlowFile;
     }
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index 775bc2f5ed..700d0d29b2 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -134,6 +134,40 @@ public class TestMockProcessSession {
         assertFalse(ff1.isPenalized());
     }
 
+    @Test
+    public void testRollbackWithCreatedFlowFile() {
+        final Processor processor = new PoorlyBehavedProcessor();
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong(0L)), processor, new 
MockStateManager(processor));
+        final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+        session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
+        session.rollback();
+        session.assertQueueEmpty();
+    }
+
+    @Test
+    public void testRollbackWithClonedFlowFile() {
+        final Processor processor = new PoorlyBehavedProcessor();
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong(0L)), processor, new 
MockStateManager(processor));
+        final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+        session.clone(ff1);
+        session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
+        session.rollback();
+        session.assertQueueEmpty();
+    }
+
+    @Test
+    public void testRollbackWithMigratedFlowFile() {
+        final Processor processor = new PoorlyBehavedProcessor();
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong(0L)), processor, new 
MockStateManager(processor));
+        final MockProcessSession newSession = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong(0L)), processor, new 
MockStateManager(processor));
+        final FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
+        session.migrate(newSession);
+        newSession.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE);
+        newSession.rollback();
+        session.assertQueueEmpty();
+        newSession.assertQueueEmpty();
+    }
+
     @Test
     public void testAttributePreservedAfterWrite() throws IOException {
         final Processor processor = new PoorlyBehavedProcessor();

Reply via email to