This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4346dd8 NIFI-9391: Modified MergeRecord to process FlowFiles within a
loop in… (#5550)
4346dd8 is described below
commit 4346dd8faf692968716669117e799afadc60150c
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Jan 4 17:36:56 2022 +0100
NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…
(#5550)
* NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in a
single onTrigger
MergeRecord processed the FlowFiles in multiple onTrigger-s and it needed
an extra onTrigger call
(with no incoming FFs) to realize that no more FFs are available and it is
time to send the merged FF downstream.
It was not compatible with Stateless Runtime which does not trigger the
flow any more if no FFs available.
Also changed "unschedule" logic in StandardProcessorTestRunner:
@OnUnscheduled methods were called immediately after
the 1st FlowFile was processed. Unschedule the processor only at the end of
the execution (onTrigger finished)
and only if stopOnFinish has been requested by the test case.
---
.../nifi/util/StandardProcessorTestRunner.java | 4 +-
.../nifi/processors/standard/MergeRecord.java | 88 ++++++++++++----------
.../nifi/processors/standard/merge/RecordBin.java | 7 +-
.../nifi/processors/standard/TestMergeRecord.java | 19 ++---
4 files changed, 61 insertions(+), 57 deletions(-)
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 27d642e..922907c 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -230,7 +230,7 @@ public class StandardProcessorTestRunner implements
TestRunner {
throw new AssertionError(thrown);
}
- if (++finishedCount == 1) {
+ if (++finishedCount == 1 && stopOnFinish) {
unscheduledRun = true;
unSchedule();
}
@@ -238,7 +238,7 @@ public class StandardProcessorTestRunner implements
TestRunner {
}
}
- if (!unscheduledRun) {
+ if (!unscheduledRun && stopOnFinish) {
unSchedule();
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 53a5e5d..982303f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -323,56 +323,64 @@ public class MergeRecord extends
AbstractSessionFactoryProcessor {
}
}
- final ProcessSession session = sessionFactory.createSession();
- final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
- if (getLogger().isDebugEnabled()) {
- final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
- getLogger().debug("Pulled {} FlowFiles from queue: {}", new
Object[] {ids.size(), ids});
- }
+ while (isScheduled()) {
+ final ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+ if (flowFiles.isEmpty()) {
+ break;
+ }
+ if (getLogger().isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
+ getLogger().debug("Pulled {} FlowFiles from queue: {}",
ids.size(), ids);
+ }
- final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
- final boolean block;
- if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
- block = true;
- } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
- block = true;
- } else {
- block = false;
- }
+ final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
+ final boolean block;
+ if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ block = true;
+ } else if
(context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+ block = true;
+ } else {
+ block = false;
+ }
- try {
- for (final FlowFile flowFile : flowFiles) {
- try {
- binFlowFile(context, flowFile, session, manager, block);
- } catch (final Exception e) {
- getLogger().error("Failed to bin {} due to {}", new
Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
+ try {
+ for (final FlowFile flowFile : flowFiles) {
+ try {
+ binFlowFile(context, flowFile, session, manager,
block);
+ } catch (final Exception e) {
+ getLogger().error("Failed to bin {} due to {}",
flowFile, e, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
}
+ } finally {
+ session.commitAsync();
}
- } finally {
- session.commitAsync();
- }
- // If there is no more data queued up, or strategy is defragment,
complete any bin that meets our minimum threshold
- // Otherwise, run one more cycle to process queued FlowFiles to add
more fragment into available bins.
- int completedBins = 0;
- if (flowFiles.isEmpty() ||
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ // Complete any bins that have reached their expiration date
try {
- completedBins += manager.completeFullEnoughBins();
+ manager.completeExpiredBins();
} catch (final Exception e) {
- getLogger().error("Failed to merge FlowFiles to create new bin
due to " + e, e);
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e, e);
}
}
- // Complete any bins that have reached their expiration date
- try {
- completedBins += manager.completeExpiredBins();
- } catch (final Exception e) {
- getLogger().error("Failed to merge FlowFiles to create new bin due
to " + e, e);
- }
+ if (isScheduled()) {
+ // Complete any bins that have reached their expiration date
+ try {
+ manager.completeExpiredBins();
+ } catch (final Exception e) {
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e, e);
+ }
+
+ // Complete any bins that meet their minimum size requirements
+ try {
+ manager.completeFullEnoughBins();
+ } catch (final Exception e) {
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e, e);
+ }
- if (completedBins == 0 && flowFiles.isEmpty()) {
- getLogger().debug("No FlowFiles to bin; will yield");
+ getLogger().debug("No more FlowFiles to bin; will yield");
context.yield();
}
}
@@ -386,7 +394,7 @@ public class MergeRecord extends
AbstractSessionFactoryProcessor {
final RecordSchema schema = reader.getSchema();
final String groupId = getGroupId(context, flowFile, schema,
session);
- getLogger().debug("Got Group ID {} for {}", new Object[] {groupId,
flowFile});
+ getLogger().debug("Got Group ID {} for {}", groupId, flowFile);
binManager.add(groupId, flowFile, reader, session, block);
} catch (MalformedRecordException | IOException |
SchemaNotFoundException e) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 6a0293b..7dfffb7 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -205,6 +205,11 @@ public class RecordBin {
return false;
}
+ if (thresholds.getFragmentCountAttribute().isPresent()) {
+ // Defragment strategy: Compare with the target fragment count.
+ return this.fragmentCount == thresholds.getFragmentCount();
+ }
+
int maxRecords = thresholds.getMaxRecords();
if (recordCount >= maxRecords) {
@@ -241,7 +246,7 @@ public class RecordBin {
}
if (thresholds.getFragmentCountAttribute().isPresent()) {
- // Compare with the target fragment count.
+ // Defragment strategy: Compare with the target fragment count.
return this.fragmentCount == thresholds.getFragmentCount();
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index f6ca6a1..ea3ace6 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -78,13 +78,10 @@ public class TestMergeRecord {
@Test
public void testMergeSimple() {
- runner.setProperty(MergeRecord.MIN_RECORDS, "2");
- runner.setProperty(MergeRecord.MAX_RECORDS, "2");
-
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Age\nJane, 34");
- runner.run(2);
+ runner.run(1);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
@@ -101,12 +98,11 @@ public class TestMergeRecord {
@Test
public void testDifferentSchema() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
- runner.setProperty(MergeRecord.MAX_RECORDS, "2");
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Color\nJane, Red");
- runner.run(2, false, true);
+ runner.run(1, false, true);
runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
@@ -114,7 +110,7 @@ public class TestMergeRecord {
runner.enqueue("Name, Age\nJane, 34");
runner.enqueue("Name, Color\nJohn, Blue");
- runner.run(2, true, false);
+ runner.run(1, true, false);
runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
@@ -133,8 +129,7 @@ public class TestMergeRecord {
@Test
public void testFailureToParse() {
- runner.setProperty(MergeRecord.MIN_RECORDS, "2");
- runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+ runner.setProperty(MergeRecord.MIN_RECORDS, "3");
readerService.failAfter(2);
@@ -329,7 +324,6 @@ public class TestMergeRecord {
@Test
public void testMinSize() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
- runner.setProperty(MergeRecord.MAX_RECORDS, "2");
runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
runner.enqueue("Name, Age\nJohn, 35");
@@ -367,7 +361,6 @@ public class TestMergeRecord {
@Test
public void testMinRecords() {
runner.setProperty(MergeRecord.MIN_RECORDS, "103");
- runner.setProperty(MergeRecord.MAX_RECORDS, "110");
runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
runner.enqueue("Name, Age\nJohn, 35");
@@ -384,7 +377,7 @@ public class TestMergeRecord {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
runner.enqueue("Name, Age\nJohn, 35");
- runner.run(2);
+ runner.run();
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
}
@@ -477,8 +470,6 @@ public class TestMergeRecord {
@Test
public void testDefragmentOldestBinFailsWhenTooManyBins() {
- runner.setProperty(MergeRecord.MIN_RECORDS, "5");
- runner.setProperty(MergeRecord.MAX_RECORDS, "10");
runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
runner.setProperty(MergeRecord.MERGE_STRATEGY,
MergeRecord.MERGE_STRATEGY_DEFRAGMENT);