This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2115d68ae7 NIFI-14166: Added "Bin Termination Check" property to
MergeContent; updated documentation to include the new property and more
clearly explain when bins get merged.
2115d68ae7 is described below
commit 2115d68ae7729857d823cb49f8180c8a9ce4302e
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 16 09:19:29 2025 -0500
NIFI-14166: Added "Bin Termination Check" property to MergeContent; updated
documentation to include the new property and more clearly explain when bins
get merged.
Signed-off-by: Pierre Villard <[email protected]>
This closes #9639.
---
.../org/apache/nifi/processor/util/bin/Bin.java | 35 +++--
.../apache/nifi/processor/util/bin/BinManager.java | 114 ++++++++--------
.../nifi/processor/util/bin/EvictionReason.java | 2 +
...{EvictionReason.java => InsertionLocation.java} | 36 +++---
.../nifi/processors/standard/MergeContent.java | 38 ++++++
.../additionalDetails.md | 22 +++-
.../nifi/processors/standard/TestMergeContent.java | 143 +++++++++++++++++++++
7 files changed, 303 insertions(+), 87 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
index 30797de200..a00c5adac7 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -48,6 +48,7 @@ public class Bin {
private volatile int maximumEntries;
private final String fileCountAttribute;
private volatile EvictionReason evictionReason = EvictionReason.UNSET;
+ private volatile boolean forcefullyCompleted = false;
private final List<FlowFile> binContents = new ArrayList<>();
private final Set<String> binIndexSet = new HashSet<>();
@@ -93,6 +94,14 @@ public class Bin {
return session;
}
+ public void complete() {
+ forcefullyCompleted = true;
+ }
+
+ public boolean isForcefullyCompleted() {
+ return forcefullyCompleted;
+ }
+
/**
* Indicates whether the bin has enough items to be considered full. This
is based on whether the current size of the bin is greater than the minimum
size in bytes and based on having a number of
* successive unsuccessful attempts to add a new item (because it is so
close to the max or the size of the objects being attempted do not favor tight
packing)
@@ -101,10 +110,14 @@ public class Bin {
*/
public boolean isFull() {
return (((size >= minimumSizeBytes) && binContents.size() >=
minimumEntries) && (successiveFailedOfferings > 5))
- || (size >= maximumSizeBytes) || (binContents.size() >=
maximumEntries);
+ || (size >= maximumSizeBytes) || (binContents.size() >=
maximumEntries)
+ || forcefullyCompleted;
}
- public EvictionReason determineFullness() {
+ public EvictionReason determineEvictionReason() {
+ if (forcefullyCompleted) {
+ return EvictionReason.BIN_TERMINATION_SIGNAL;
+ }
if (size >= maximumSizeBytes) {
return EvictionReason.MAX_BYTES_THRESHOLD_REACHED;
}
@@ -165,21 +178,13 @@ public class Bin {
* @return true if added; false otherwise
*/
public boolean offer(final FlowFile flowFile, final ProcessSession
session) {
- if (((size + flowFile.getSize()) > maximumSizeBytes) ||
(binContents.size() >= maximumEntries)) {
+ if (forcefullyCompleted || ((size + flowFile.getSize()) >
maximumSizeBytes) || (binContents.size() >= maximumEntries)) {
successiveFailedOfferings++;
return false;
}
// fileCountAttribute is non-null for defragment mode
if (fileCountAttribute != null) {
- final String countValue =
flowFile.getAttribute(fileCountAttribute);
- final Integer count = toInteger(countValue);
- if (count != null) {
- // set the limits for the bin as an exact count when the count
attribute arrives
- this.maximumEntries = count;
- this.minimumEntries = count;
- }
-
final String index =
flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
if (index == null || index.isEmpty() || !binIndexSet.add(index)) {
// Do not accept flowfile with duplicate fragment index value
@@ -187,6 +192,14 @@ public class Bin {
successiveFailedOfferings++;
return false;
}
+
+ final String countValue =
flowFile.getAttribute(fileCountAttribute);
+ final Integer count = toInteger(countValue);
+ if (count != null) {
+ // set the limits for the bin as an exact count when the count
attribute arrives
+ this.maximumEntries = count;
+ this.minimumEntries = count;
+ }
}
size += flowFile.getSize();
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
index 72ce099b1f..5e611d29fa 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
/**
* This class is thread safe
@@ -45,6 +46,8 @@ public class BinManager {
private final AtomicInteger minEntries = new AtomicInteger(0);
private final AtomicInteger maxEntries = new
AtomicInteger(Integer.MAX_VALUE);
private final AtomicReference<String> fileCountAttribute = new
AtomicReference<>(null);
+ private volatile Predicate<FlowFile> binTerminationCheck = ff -> false;
+ private volatile InsertionLocation insertionLocation =
InsertionLocation.LAST_IN_BIN;
private final AtomicInteger maxBinAgeSeconds = new
AtomicInteger(Integer.MAX_VALUE);
private final Map<String, List<Bin>> groupBinMap = new HashMap<>();
@@ -88,6 +91,16 @@ public class BinManager {
this.maxEntries.set(maximumEntries);
}
+ /**
+ * Sets the predicate that determines whether or not a FlowFile should
terminate a bin and the location in the bin where the FlowFile should be
inserted if so.
+ * @param binTerminationCheck the predicate to use to determine if a
FlowFile should terminate a bin
+ * @param insertionLocation the location in the bin where the FlowFile
should be inserted if it terminates the bin
+ */
+ public void setBinTermination(final Predicate<FlowFile>
binTerminationCheck, final InsertionLocation insertionLocation) {
+ this.binTerminationCheck = binTerminationCheck;
+ this.insertionLocation = insertionLocation;
+ }
+
public int getBinCount() {
rLock.lock();
try {
@@ -121,39 +134,8 @@ public class BinManager {
* @return true if added; false if no bin exists which can fit this item
and no bin can be created based on current min/max criteria
*/
public boolean offer(final String groupIdentifier, final FlowFile
flowFile, final ProcessSession session, final ProcessSessionFactory
sessionFactory) {
- final long currentMaxSizeBytes = maxSizeBytes.get();
- if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any
new bins (and probably none existing)
- return false;
- }
- wLock.lock();
- try {
- final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
- if (currentBins == null) { // this is a new group we need to
register
- final List<Bin> bins = new ArrayList<>();
- final Bin bin = new Bin(sessionFactory.createSession(),
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
- maxEntries.get(), fileCountAttribute.get());
- bins.add(bin);
- groupBinMap.put(groupIdentifier, bins);
- binCount++;
- return bin.offer(flowFile, session);
- } else {
- for (final Bin bin : currentBins) {
- final boolean accepted = bin.offer(flowFile, session);
- if (accepted) {
- return true;
- }
- }
-
- //if we've reached this point then we couldn't fit it into any
existing bins - gotta make a new one
- final Bin bin = new Bin(sessionFactory.createSession(),
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
- maxEntries.get(), fileCountAttribute.get());
- currentBins.add(bin);
- binCount++;
- return bin.offer(flowFile, session);
- }
- } finally {
- wLock.unlock();
- }
+ final Set<FlowFile> unbinned = offer(groupIdentifier,
List.of(flowFile), session, sessionFactory);
+ return unbinned.isEmpty();
}
/**
@@ -179,11 +161,39 @@ public class BinManager {
continue;
}
+ final boolean terminatesBin = binTerminationCheck != null &&
binTerminationCheck.test(flowFile);
+
final List<Bin> currentBins =
groupBinMap.computeIfAbsent(groupIdentifier, k -> new ArrayList<>());
- for (final Bin bin : currentBins) {
- final boolean accepted = bin.offer(flowFile, session);
- if (accepted) {
- continue flowFileLoop;
+ if (terminatesBin) {
+ if (insertionLocation == InsertionLocation.LAST_IN_BIN) {
+ for (final Bin bin : currentBins) {
+ final boolean accepted = bin.offer(flowFile,
session);
+ if (accepted) {
+ bin.complete();
+
bin.setEvictionReason(EvictionReason.BIN_TERMINATION_SIGNAL);
+
+ continue flowFileLoop;
+ }
+ }
+ } else if (!currentBins.isEmpty()) {
+ for (final Bin bin : currentBins) {
+ if (bin.isForcefullyCompleted()) {
+ continue;
+ }
+
+ bin.complete();
+
bin.setEvictionReason(EvictionReason.BIN_TERMINATION_SIGNAL);
+ break;
+
+ // Note that we intentionally do not continue the
flowFileLoop here because we want to create a new bin
+ }
+ }
+ } else {
+ for (final Bin bin : currentBins) {
+ final boolean accepted = bin.offer(flowFile, session);
+ if (accepted) {
+ continue flowFileLoop;
+ }
}
}
@@ -194,7 +204,12 @@ public class BinManager {
currentBins.add(bin);
binCount++;
final boolean added = bin.offer(flowFile, session);
- if (!added) {
+ if (added) {
+ if (terminatesBin && (insertionLocation ==
InsertionLocation.ISOLATED || insertionLocation ==
InsertionLocation.LAST_IN_BIN)) {
+ bin.complete();
+
bin.setEvictionReason(EvictionReason.BIN_TERMINATION_SIGNAL);
+ }
+ } else {
unbinned.add(flowFile);
}
@@ -223,10 +238,10 @@ public class BinManager {
final List<Bin> remainingBins = new ArrayList<>();
for (final Bin bin : group.getValue()) {
if (relaxFullnessConstraint && bin.isFullEnough()) {
- bin.setEvictionReason(bin.determineFullness());
+ bin.setEvictionReason(bin.determineEvictionReason());
readyBins.add(bin);
} else if (!relaxFullnessConstraint && bin.isFull()) {
//strict check
- bin.setEvictionReason(bin.determineFullness());
+ bin.setEvictionReason(bin.determineEvictionReason());
readyBins.add(bin);
} else if (relaxFullnessConstraint &&
bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) {
bin.setEvictionReason(EvictionReason.TIMEOUT);
@@ -279,23 +294,4 @@ public class BinManager {
}
}
- /**
- * @return true if any current bins are older than the allowable max
- */
- public boolean containsOldBins() {
- rLock.lock();
- try {
- for (final List<Bin> bins : groupBinMap.values()) {
- for (final Bin bin : bins) {
- if (bin.isOlderThan(maxBinAgeSeconds.get(),
TimeUnit.SECONDS)) {
- return true;
- }
- }
- }
- } finally {
- rLock.unlock();
- }
- return false;
- }
-
}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
index ddafc20e66..e113891800 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
@@ -28,6 +28,8 @@ public enum EvictionReason {
BIN_MANAGER_FULL("The oldest Bin was removed because incoming FlowFile
could not be placed in an existing Bin, and the Maximum Number of Bins was
reached"),
+ BIN_TERMINATION_SIGNAL("A FlowFile signaled that the Bin should be
terminated"),
+
UNSET("No reason was determined");
private final String explanation;
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/InsertionLocation.java
similarity index 50%
copy from
nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
copy to
nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/InsertionLocation.java
index ddafc20e66..b820c08911 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/InsertionLocation.java
@@ -17,29 +17,33 @@
package org.apache.nifi.processor.util.bin;
-public enum EvictionReason {
- MAX_BYTES_THRESHOLD_REACHED("Maximum number of bytes (Max Group Size)
reached"),
+import org.apache.nifi.components.DescribedValue;
- MAX_ENTRIES_THRESHOLD_REACHED("Maximum number of entries reached"),
+public enum InsertionLocation implements DescribedValue {
+ LAST_IN_BIN("Last in Bin", "Insert the FlowFile at the end of the Bin that
is terminated"),
+ FIRST_IN_NEW_BIN("First in New Bin", "Insert the FlowFile at the beginning
of a newly created Bin"),
+ ISOLATED("Isolated", "Insert the FlowFile into a new Bin and terminate the
Bin immediately with the FlowFile as the only content");
- MIN_THRESHOLDS_REACHED("Minimum number of bytes (Min Group Size) and
minimum number of entries reached"),
+ private final String name;
+ private final String description;
- TIMEOUT("Max Bin Age reached"),
-
- BIN_MANAGER_FULL("The oldest Bin was removed because incoming FlowFile
could not be placed in an existing Bin, and the Maximum Number of Bins was
reached"),
-
- UNSET("No reason was determined");
+ InsertionLocation(final String name, final String description) {
+ this.name = name;
+ this.description = description;
+ }
- private final String explanation;
- EvictionReason(final String explanation) {
- this.explanation = explanation;
+ @Override
+ public String getValue() {
+ return name;
}
- public String getExplanation() {
- return explanation;
+ @Override
+ public String getDisplayName() {
+ return name;
}
- public String toString() {
- return explanation;
+ @Override
+ public String getDescription() {
+ return description;
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index c57576174f..34e25bbbf1 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -44,10 +44,12 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -62,6 +64,7 @@ import org.apache.nifi.processor.util.bin.Bin;
import org.apache.nifi.processor.util.bin.BinFiles;
import org.apache.nifi.processor.util.bin.BinManager;
import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.InsertionLocation;
import org.apache.nifi.processors.standard.merge.AttributeStrategy;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
@@ -95,6 +98,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
@@ -395,6 +399,31 @@ public class MergeContent extends BinFiles {
.dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR)
.build();
+ public static final PropertyDescriptor BIN_TERMINATION_CHECK = new
PropertyDescriptor.Builder()
+ .name("Bin Termination Check")
+ .description("""
+ Specifies an Expression Language Expression that is to be
evaluated against each FlowFile. If the result of the expression is 'true', the
+ bin that the FlowFile corresponds to will be terminated, even if
the bin has not met the minimum number of entries or minimum size.
+ Note that if the FlowFile that triggers the termination of the bin
is itself larger than the Maximum Bin Size, it will be placed into its
+ own bin without triggering the termination of any other bin. When
using this property, it is recommended to use Prioritizers in the flow's
+ connections to ensure that the ordering is as desired.
+ """)
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.BOOLEAN,
false))
+ .dependsOn(MERGE_STRATEGY, MERGE_STRATEGY_BIN_PACK)
+ .build();
+
+ public static final PropertyDescriptor FLOWFILE_INSERTION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("FlowFile Insertion Strategy")
+ .description("If a given FlowFile terminates the bin based on the <Bin
Termination Check> property, specifies where the FlowFile should be included in
the bin.")
+ .required(true)
+ .dependsOn(BIN_TERMINATION_CHECK)
+ .defaultValue(InsertionLocation.LAST_IN_BIN)
+ .allowableValues(InsertionLocation.class)
+ .build();
+
+
private static final List<PropertyDescriptor> PROPERTIES = List.of(
MERGE_STRATEGY,
MERGE_FORMAT,
@@ -405,6 +434,8 @@ public class MergeContent extends BinFiles {
addBinPackingDependency(MAX_ENTRIES),
addBinPackingDependency(MIN_SIZE),
addBinPackingDependency(MAX_SIZE),
+ BIN_TERMINATION_CHECK,
+ FLOWFILE_INSERTION_STRATEGY,
MAX_BIN_AGE,
MAX_BIN_COUNT,
DELIMITER_STRATEGY,
@@ -495,6 +526,13 @@ public class MergeContent extends BinFiles {
binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
} else {
binManager.setFileCountAttribute(null);
+
+ final PropertyValue terminationCheck =
context.getProperty(BIN_TERMINATION_CHECK);
+ if (terminationCheck.isSet()) {
+ final InsertionLocation insertionLocation =
context.getProperty(FLOWFILE_INSERTION_STRATEGY).asAllowableValue(InsertionLocation.class);
+ final Predicate<FlowFile> predicate = flowFile ->
terminationCheck.evaluateAttributeExpressions(flowFile).asBoolean();
+ binManager.setBinTermination(predicate, insertionLocation);
+ }
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
index 2eb50f08ab..dc6a82cf8e 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
@@ -83,7 +83,7 @@ example, the user can specify the minimum number of FlowFiles
that must be packa
performed. The minimum number of bytes can also be configured. Additionally, a
maximum number of FlowFiles and bytes may
be specified.
-There are also two other conditions that will result in the contents of a Bin
being merged together. The Max Bin Age
+There are two other conditions that will result in the contents of a Bin being
merged together. The Max Bin Age
property specifies the maximum amount of time that FlowFiles can be binned
together before the bin is merged. This
property should almost always be set, as it provides a means to set a timeout
on a bin, so that even if data stops
flowing to the Processor for a while (due to a problem with an upstream
system, a source processor being stopped, etc.)
@@ -102,6 +102,25 @@ this value or the bin will never be complete. If all the
necessary FlowFiles are
which the bin times amount (as specified by the <Max Bin Age> property), then
the FlowFiles will all be routed to the '
failure' relationship instead of being merged together.
+Finally, a bin can be merged if the <Bin Termination Check> property is
configured and a FlowFile is received that
+satisfies the specified condition. The condition is specified as an Expression
Language expression. If any FlowFile
+result in the expression returning a value of true, then the bin will be
merged, regardless of how much data is in
+the bin or how old the bin is. This incoming FlowFile that triggers the bin to
be merged can either be added as the
+last entry in the bin, as the first entry in a new bin, or output as its own
bin, depending on the value of the
+<FlowFile Insertion Strategy> property.
+
+A bin of FlowFiles, then, is merged when any one of the following conditions
is met:
+- The bin has reached the maximum number of bytes, as configured by the <Max
Group Size> property.
+- The bin has reached the maximum number of FlowFiles, as configured by the
<Maximum Number of Entries> property.
+- The bin has reached both the minimum number of bytes, as configured by the
<Min Group Size> property,
+ AND the minimum number of FlowFiles, as configured by the <Minimum Number of
Entries> property.
+- The bin has reached the maximum age, as configured by the <Max Bin Age>
property.
+- The maximum number of bins has been reached, as configured by the <Maximum
number of Bins> property, and a new bin must be created.
+- The <Bin Termination Check> property is configured and a FlowFile is
received that satisfies the specified condition.
+
+
+### Reason for Merge
+
Whenever the contents of a Bin are merged, an attribute with the name
"merge.reason" will be added to the merged
FlowFile. The below table provides a listing of all possible values for this
attribute with an explanation of each.
@@ -112,6 +131,7 @@ FlowFile. The below table provides a listing of all
possible values for this att
| MIN\_THRESHOLDS\_REACHED | The bin has reached both the minimum
number of bytes, as configured by the <Min Group Size> property, AND the
minimum number of FlowFiles, as configured by the <Minimum Number of Entries>
property. The bin has not reached the maximum number of bytes (Max Group Size)
OR the maximum number of FlowFiles (Maximum Number of Entries).
[...]
| TIMEOUT | The Bin has reached the maximum age, as
configured by the <Max Bin Age> property. If this threshold is reached, the
contents of the Bin will be merged together, even if the Bin has not yet
reached either of the minimum thresholds. Note that the age here is determined
by when the Bin was created, NOT the age of the FlowFiles that reside within
those Bins. As a result, if the Processor is stopped until it has 1 million
FlowFiles queued, each one being 1 [...]
| BIN\_MANAGER\_FULL | If an incoming FlowFile does not fit into
any of the existing Bins (either due to the Maximum thresholds set, or due to
the Correlation Attribute being used, etc.), then a new Bin must be created for
the incoming FlowFiles. If the number of active Bins is already equal to the
<Maximum number of Bins> property, the oldest Bin will be merged in order to
make room for the new Bin. In that case, the Bin Manager is said to be full,
and this value will be u [...]
+| BIN\_TERMINATION\_SIGNAL | A FlowFile signaled that the Bin should
be terminated by satisfying the configured <Bin Termination Check> property.
[...]
Note that the attribute value is minimally named, while the textual
description is far more verbose. This is done for a
few reasons. Firstly, storing a large value for the attribute can be more
costly, utilizing more heap space and
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 079edc8aa4..e89c7cd95f 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.bin.InsertionLocation;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
@@ -48,6 +49,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -1338,6 +1340,147 @@ public class TestMergeContent {
assertEquals(2, runner.getQueueSize().getObjectCount());
}
+ @Test
+ public void testBinTerminationTriggerStartOfBin() {
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+ runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+ runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+ runner.setProperty(MergeContent.BIN_TERMINATION_CHECK,
"${termination:equals('true')}");
+ runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY,
InsertionLocation.FIRST_IN_NEW_BIN);
+
+ // Enqueue 5 FlowFiles, followed by a FlowFile with the 'termination'
attribute set to 'true'.
+ // Do this 3 times.
+ int flowFileIndex = 0;
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 6; i++) {
+ final Map<String, String> attributes = (i == 5) ?
Map.of("termination", "true") : Collections.emptyMap();
+ runner.enqueue((flowFileIndex++) + "\n", attributes);
+ }
+ }
+
+ runner.run(2);
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+
+ // We should get out 17 because the last FlowFile has not yet been
transferred out, since it is the start of a new bin.
+ runner.assertTransferCount(MergeContent.REL_ORIGINAL, 17);
+
+ final List<MockFlowFile> merged =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+ merged.getFirst().assertContentEquals("0\n1\n2\n3\n4\n");
+ merged.get(1).assertContentEquals("5\n6\n7\n8\n9\n10\n");
+ merged.get(2).assertContentEquals("11\n12\n13\n14\n15\n16\n");
+ }
+
+ @Test
+ public void testBinTerminationTriggerEndOfBin() {
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+ runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+ runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+ runner.setProperty(MergeContent.BIN_TERMINATION_CHECK,
"${termination:equals('true')}");
+ runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY,
InsertionLocation.LAST_IN_BIN);
+
+ // Enqueue 5 FlowFiles, followed by a FlowFile with the 'termination'
attribute set to 'true'.
+ // Do this 3 times.
+ int flowFileIndex = 0;
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 6; i++) {
+ final Map<String, String> attributes = (i == 5) ?
Map.of("termination", "true") : Collections.emptyMap();
+ runner.enqueue((flowFileIndex++) + "\n", attributes);
+ }
+ }
+
+ runner.run(2);
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+
+ // We should get out 18 because the last FlowFile ended the bin and
was transferred out.
+ runner.assertTransferCount(MergeContent.REL_ORIGINAL, 18);
+
+ final List<MockFlowFile> merged =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+ merged.getFirst().assertContentEquals("0\n1\n2\n3\n4\n5\n");
+ merged.get(1).assertContentEquals("6\n7\n8\n9\n10\n11\n");
+ merged.get(2).assertContentEquals("12\n13\n14\n15\n16\n17\n");
+ }
+
+ @Test
+ public void testBinTerminationTriggerIsolated() {
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+ runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+ runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+ runner.setProperty(MergeContent.BIN_TERMINATION_CHECK,
"${termination:equals('true')}");
+ runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY,
InsertionLocation.ISOLATED);
+
+ // Enqueue 5 FlowFiles, followed by a FlowFile with the 'termination'
attribute set to 'true'.
+ // Do this 3 times.
+ int flowFileIndex = 0;
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < 6; i++) {
+ final Map<String, String> attributes = (i == 5) ?
Map.of("termination", "true") : Collections.emptyMap();
+ runner.enqueue((flowFileIndex++) + "\n", attributes);
+ }
+ }
+
+ runner.run(2);
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 6);
+
+ // We should get out 18 because the last FlowFile ended the bin and
was transferred out.
+ runner.assertTransferCount(MergeContent.REL_ORIGINAL, 18);
+
+ final List<MockFlowFile> merged =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+ merged.getFirst().assertContentEquals("0\n1\n2\n3\n4\n");
+ merged.get(1).assertContentEquals("5\n");
+ merged.get(2).assertContentEquals("6\n7\n8\n9\n10\n");
+ merged.get(3).assertContentEquals("11\n");
+ merged.get(4).assertContentEquals("12\n13\n14\n15\n16\n");
+ merged.get(5).assertContentEquals("17\n");
+ }
+
+ @Test
+ public void testTerminationTriggerWithCorrelationAttribute() {
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+ runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+ runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+ runner.setProperty(MergeContent.BIN_TERMINATION_CHECK,
"${termination:equals('true')}");
+ runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY,
InsertionLocation.FIRST_IN_NEW_BIN);
+ runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME,
"correlation");
+
+ // Enqueue FlowFiles. This should create 2 bins:
+ // '1' should get values 0, 1
+ // Then, '2' should get value 2
+ // Then, FlowFile with content '3' should be the start of a new bin
+ // Then, FlowFile with content '4' should be teh start of another new
bin
+ // Then, we add 10 additional FlowFiles to bin 2 in order to fill it
without a termination signal
+ // This should leave the FlowFile with content '4' in the bin, but not
transferred out.
+ final Map<String, String> terminationAttributes =
Map.of("termination", "true", "correlation", "1");
+ final Map<String, String> correlation1 = Map.of("correlation", "1");
+ final Map<String, String> correlation2 = Map.of("correlation", "2");
+
+ runner.enqueue("0\n", correlation1);
+ runner.enqueue("1\n", correlation1);
+ runner.enqueue("2\n", correlation2);
+ runner.enqueue("3\n", terminationAttributes);
+ runner.enqueue("4\n", terminationAttributes);
+
+ for (int i = 0; i < 10; i++) {
+ runner.enqueue(i + "\n", correlation2);
+ }
+
+ runner.run(2);
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+ runner.assertTransferCount(MergeContent.REL_ORIGINAL, 14);
+
+ final List<MockFlowFile> merged =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+ merged.getFirst().assertContentEquals("0\n1\n");
+ merged.get(1).assertContentEquals("3\n");
+ merged.get(2).assertContentEquals("2\n0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n");
+ }
+
private void createFlowFiles(final TestRunner testRunner) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),
"application/plain-text");