This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4346dd8  NIFI-9391: Modified MergeRecord to process FlowFiles within a 
loop in… (#5550)
4346dd8 is described below

commit 4346dd8faf692968716669117e799afadc60150c
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Jan 4 17:36:56 2022 +0100

    NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in… 
(#5550)
    
    * NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in a 
single onTrigger
    
    MergeRecord processed the FlowFiles in multiple onTrigger-s and it needed 
an extra onTrigger call
    (with no incoming FFs) to realize that no more FFs are available and it is 
time to send the merged FF downstream.
    It was not compatible with Stateless Runtime which does not trigger the 
flow any more if no FFs available.
    
    Also changed "unschedule" logic in StandardProcessorTestRunner: 
@OnUnscheduled methods were called immediately after
    the 1st FlowFile was processed. Unschedule the processor only at the end of 
the execution (onTrigger finished)
    and only if stopOnFinish has been requested by the test case.
---
 .../nifi/util/StandardProcessorTestRunner.java     |  4 +-
 .../nifi/processors/standard/MergeRecord.java      | 88 ++++++++++++----------
 .../nifi/processors/standard/merge/RecordBin.java  |  7 +-
 .../nifi/processors/standard/TestMergeRecord.java  | 19 ++---
 4 files changed, 61 insertions(+), 57 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 27d642e..922907c 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -230,7 +230,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
                         throw new AssertionError(thrown);
                     }
 
-                    if (++finishedCount == 1) {
+                    if (++finishedCount == 1 && stopOnFinish) {
                         unscheduledRun = true;
                         unSchedule();
                     }
@@ -238,7 +238,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
                 }
             }
 
-            if (!unscheduledRun) {
+            if (!unscheduledRun && stopOnFinish) {
                 unSchedule();
             }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 53a5e5d..982303f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -323,56 +323,64 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + 
ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new 
Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + 
ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", 
ids.size(), ids);
+            }
 
-        final String mergeStrategy = 
context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = 
context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if 
(context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new 
Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, 
block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", 
flowFile, e, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, 
complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add 
more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || 
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin 
due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin 
due to {}", e, e);
             }
         }
 
-        // Complete any bins that have reached their expiration date
-        try {
-            completedBins += manager.completeExpiredBins();
-        } catch (final Exception e) {
-            getLogger().error("Failed to merge FlowFiles to create new bin due 
to " + e, e);
-        }
+        if (isScheduled()) {
+            // Complete any bins that have reached their expiration date
+            try {
+                manager.completeExpiredBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin 
due to {}", e, e);
+            }
+
+            // Complete any bins that meet their minimum size requirements
+            try {
+                manager.completeFullEnoughBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin 
due to {}", e, e);
+            }
 
-        if (completedBins == 0 && flowFiles.isEmpty()) {
-            getLogger().debug("No FlowFiles to bin; will yield");
+            getLogger().debug("No more FlowFiles to bin; will yield");
             context.yield();
         }
     }
@@ -386,7 +394,7 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
             final RecordSchema schema = reader.getSchema();
 
             final String groupId = getGroupId(context, flowFile, schema, 
session);
-            getLogger().debug("Got Group ID {} for {}", new Object[] {groupId, 
flowFile});
+            getLogger().debug("Got Group ID {} for {}", groupId, flowFile);
 
             binManager.add(groupId, flowFile, reader, session, block);
         } catch (MalformedRecordException | IOException | 
SchemaNotFoundException e) {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 6a0293b..7dfffb7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -205,6 +205,11 @@ public class RecordBin {
                 return false;
             }
 
+            if (thresholds.getFragmentCountAttribute().isPresent()) {
+                // Defragment strategy: Compare with the target fragment count.
+                return this.fragmentCount == thresholds.getFragmentCount();
+            }
+
             int maxRecords = thresholds.getMaxRecords();
 
             if (recordCount >= maxRecords) {
@@ -241,7 +246,7 @@ public class RecordBin {
             }
 
             if (thresholds.getFragmentCountAttribute().isPresent()) {
-                // Compare with the target fragment count.
+                // Defragment strategy: Compare with the target fragment count.
                 return this.fragmentCount == thresholds.getFragmentCount();
             }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index f6ca6a1..ea3ace6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -78,13 +78,10 @@ public class TestMergeRecord {
 
     @Test
     public void testMergeSimple() {
-        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
-        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
-
         runner.enqueue("Name, Age\nJohn, 35");
         runner.enqueue("Name, Age\nJane, 34");
 
-        runner.run(2);
+        runner.run(1);
         runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
 
@@ -101,12 +98,11 @@ public class TestMergeRecord {
     @Test
     public void testDifferentSchema() {
         runner.setProperty(MergeRecord.MIN_RECORDS, "2");
-        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
 
         runner.enqueue("Name, Age\nJohn, 35");
         runner.enqueue("Name, Color\nJane, Red");
 
-        runner.run(2, false, true);
+        runner.run(1, false, true);
 
         runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
@@ -114,7 +110,7 @@ public class TestMergeRecord {
         runner.enqueue("Name, Age\nJane, 34");
         runner.enqueue("Name, Color\nJohn, Blue");
 
-        runner.run(2, true, false);
+        runner.run(1, true, false);
 
         runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
@@ -133,8 +129,7 @@ public class TestMergeRecord {
 
     @Test
     public void testFailureToParse() {
-        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
-        runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+        runner.setProperty(MergeRecord.MIN_RECORDS, "3");
 
         readerService.failAfter(2);
 
@@ -329,7 +324,6 @@ public class TestMergeRecord {
     @Test
     public void testMinSize() {
         runner.setProperty(MergeRecord.MIN_RECORDS, "2");
-        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
         runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
 
         runner.enqueue("Name, Age\nJohn, 35");
@@ -367,7 +361,6 @@ public class TestMergeRecord {
     @Test
     public void testMinRecords() {
         runner.setProperty(MergeRecord.MIN_RECORDS, "103");
-        runner.setProperty(MergeRecord.MAX_RECORDS, "110");
         runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
 
         runner.enqueue("Name, Age\nJohn, 35");
@@ -384,7 +377,7 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
 
         runner.enqueue("Name, Age\nJohn, 35");
-        runner.run(2);
+        runner.run();
         runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
     }
@@ -477,8 +470,6 @@ public class TestMergeRecord {
 
     @Test
     public void testDefragmentOldestBinFailsWhenTooManyBins() {
-        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
-        runner.setProperty(MergeRecord.MAX_RECORDS, "10");
         runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
         runner.setProperty(MergeRecord.MERGE_STRATEGY, 
MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
 

Reply via email to