This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 6929ddecb6 [NIFI-12067] mock process session keeps track of flowfiles created during the session and removes them on rollback rather than putting them on the input queue (#7827) 6929ddecb6 is described below commit 6929ddecb62b247451fc12ca7a82ca33dea7628e Author: Eric Secules <esecu...@gmail.com> AuthorDate: Wed Oct 4 07:48:50 2023 -0700 [NIFI-12067] mock process session keeps track of flowfiles created during the session and removes them on rollback rather than putting them on the input queue (#7827) rename method to be more descriptive, fix checkstyle error for trailing whitespace in TestMockProcessSession.java added session.transfer call to unit tests so that they fail without the fixes. Co-authored-by: Eric Secules <eric.secu...@macrohealth.com> --- .../org/apache/nifi/util/MockProcessSession.java | 48 ++++++++++++++-------- .../apache/nifi/util/TestMockProcessSession.java | 34 +++++++++++++++ 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index ce0d93094e..c408c44e5b 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -69,6 +69,7 @@ public class MockProcessSession implements ProcessSession { private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>(); private final MockFlowFileQueue processorQueue; private final Set<Long> beingProcessed = new HashSet<>(); + private final Set<Long> created = new HashSet<>(); private final List<MockFlowFile> penalized = new ArrayList<>(); private final Processor processor; @@ -213,6 +214,10 @@ public class MockProcessSession implements ProcessSession { if (removedFlowFiles.remove(flowFile.getId())) { newOwner.removedFlowFiles.add(flowFile.getId()); } + + if (created.remove(flowFile.getId())) { + newOwner.created.add(flowFile.getId()); + } } final Set<String> flowFileIds = flowFiles.stream() @@ -226,8 +231,7 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile clone(FlowFile flowFile) { flowFile = validateState(flowFile); final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -242,8 +246,7 @@ public class MockProcessSession implements ProcessSession { final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size)); newFlowFile.setData(newContent); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -290,6 +293,7 @@ public class MockProcessSession implements ProcessSession { beingProcessed.clear(); currentVersions.clear(); originalVersions.clear(); + created.clear(); for (final Map.Entry<String, Long> entry : counterMap.entrySet()) { sharedState.adjustCounter(entry.getKey(), entry.getValue()); @@ -339,8 +343,7 @@ public class MockProcessSession implements ProcessSession { @Override public MockFlowFile create() { final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId()); - currentVersions.put(flowFile.getId(), flowFile); - beingProcessed.add(flowFile.getId()); + updateStateWithNewFlowFile(flowFile); return flowFile; } @@ -348,8 +351,7 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile create(final FlowFile flowFile) { MockFlowFile newFlowFile = create(); newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -357,8 +359,7 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile create(final Collection<FlowFile> flowFiles) { MockFlowFile newFlowFile = create(); newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile); - currentVersions.put(newFlowFile.getId(), newFlowFile); - beingProcessed.add(newFlowFile.getId()); + updateStateWithNewFlowFile(newFlowFile); return newFlowFile; } @@ -802,9 +803,11 @@ public class MockProcessSession implements ProcessSession { for (final List<MockFlowFile> list : transferMap.values()) { for (final MockFlowFile flowFile : list) { - processorQueue.offer(flowFile); - if (penalize) { - penalized.add(flowFile); + if (!created.contains(flowFile.getId())) { + processorQueue.offer(flowFile); + if (penalize) { + penalized.add(flowFile); + } } } } @@ -812,9 +815,11 @@ public class MockProcessSession implements ProcessSession { for (final Long flowFileId : beingProcessed) { final MockFlowFile flowFile = originalVersions.get(flowFileId); if (flowFile != null) { - processorQueue.offer(flowFile); - if (penalize) { - penalized.add(flowFile); + if (!created.contains(flowFile.getId())) { + processorQueue.offer(flowFile); + if (penalize) { + penalized.add(flowFile); + } } } } @@ -824,6 +829,7 @@ public class MockProcessSession implements ProcessSession { currentVersions.clear(); originalVersions.clear(); transferMap.clear(); + created.clear(); clearTransferState(); if (!penalize) { penalized.clear(); @@ -858,6 +864,15 @@ public class MockProcessSession implements ProcessSession { mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet()); } + private void updateStateWithNewFlowFile(MockFlowFile newFlowFile) { + if (newFlowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + created.add(newFlowFile.getId()); + } + @Override public void transfer(final Collection<FlowFile> flowFiles) { for (final FlowFile flowFile : flowFiles) { @@ -1072,6 +1087,7 @@ public class MockProcessSession implements ProcessSession { final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); newFlowFile.setData(baos.toByteArray()); currentVersions.put(newFlowFile.getId(), newFlowFile); + created.add(newFlowFile.getId()); return newFlowFile; } diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index 775bc2f5ed..700d0d29b2 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -134,6 +134,40 @@ public class TestMockProcessSession { assertFalse(ff1.isPenalized()); } + @Test + public void testRollbackWithCreatedFlowFile() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); + session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE); + session.rollback(); + session.assertQueueEmpty(); + } + + @Test + public void testRollbackWithClonedFlowFile() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); + session.clone(ff1); + session.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE); + session.rollback(); + session.assertQueueEmpty(); + } + + @Test + public void testRollbackWithMigratedFlowFile() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final MockProcessSession newSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + final FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); + session.migrate(newSession); + newSession.transfer(ff1, PoorlyBehavedProcessor.REL_FAILURE); + newSession.rollback(); + session.assertQueueEmpty(); + newSession.assertQueueEmpty(); + } + @Test public void testAttributePreservedAfterWrite() throws IOException { final Processor processor = new PoorlyBehavedProcessor();