This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2115d68ae7 NIFI-14166: Added "Bin Termination Check" property to 
MergeContent; updated documentation to include the new property and more 
clearly explain when bins get merged.
2115d68ae7 is described below

commit 2115d68ae7729857d823cb49f8180c8a9ce4302e
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 16 09:19:29 2025 -0500

    NIFI-14166: Added "Bin Termination Check" property to MergeContent; updated 
documentation to include the new property and more clearly explain when bins 
get merged.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9639.
---
 .../org/apache/nifi/processor/util/bin/Bin.java    |  35 +++--
 .../apache/nifi/processor/util/bin/BinManager.java | 114 ++++++++--------
 .../nifi/processor/util/bin/EvictionReason.java    |   2 +
 ...{EvictionReason.java => InsertionLocation.java} |  36 +++---
 .../nifi/processors/standard/MergeContent.java     |  38 ++++++
 .../additionalDetails.md                           |  22 +++-
 .../nifi/processors/standard/TestMergeContent.java | 143 +++++++++++++++++++++
 7 files changed, 303 insertions(+), 87 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
index 30797de200..a00c5adac7 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -48,6 +48,7 @@ public class Bin {
     private volatile int maximumEntries;
     private final String fileCountAttribute;
     private volatile EvictionReason evictionReason = EvictionReason.UNSET;
+    private volatile boolean forcefullyCompleted = false;
 
     private final List<FlowFile> binContents = new ArrayList<>();
     private final Set<String> binIndexSet = new HashSet<>();
@@ -93,6 +94,14 @@ public class Bin {
         return session;
     }
 
+    public void complete() {
+        forcefullyCompleted = true;
+    }
+
+    public boolean isForcefullyCompleted() {
+        return forcefullyCompleted;
+    }
+
     /**
      * Indicates whether the bin has enough items to be considered full. This 
is based on whether the current size of the bin is greater than the minimum 
size in bytes and based on having a number of
      * successive unsuccessful attempts to add a new item (because it is so 
close to the max or the size of the objects being attempted do not favor tight 
packing)
@@ -101,10 +110,14 @@ public class Bin {
      */
     public boolean isFull() {
         return (((size >= minimumSizeBytes) && binContents.size() >= 
minimumEntries) && (successiveFailedOfferings > 5))
-                || (size >= maximumSizeBytes) || (binContents.size() >= 
maximumEntries);
+                || (size >= maximumSizeBytes) || (binContents.size() >= 
maximumEntries)
+                || forcefullyCompleted;
     }
 
-    public EvictionReason determineFullness() {
+    public EvictionReason determineEvictionReason() {
+        if (forcefullyCompleted) {
+            return EvictionReason.BIN_TERMINATION_SIGNAL;
+        }
         if (size >= maximumSizeBytes) {
             return EvictionReason.MAX_BYTES_THRESHOLD_REACHED;
         }
@@ -165,21 +178,13 @@ public class Bin {
      * @return true if added; false otherwise
      */
     public boolean offer(final FlowFile flowFile, final ProcessSession 
session) {
-        if (((size + flowFile.getSize()) > maximumSizeBytes) || 
(binContents.size() >= maximumEntries)) {
+        if (forcefullyCompleted || ((size + flowFile.getSize()) > 
maximumSizeBytes) || (binContents.size() >= maximumEntries)) {
             successiveFailedOfferings++;
             return false;
         }
 
         // fileCountAttribute is non-null for defragment mode
         if (fileCountAttribute != null) {
-            final String countValue = 
flowFile.getAttribute(fileCountAttribute);
-            final Integer count = toInteger(countValue);
-            if (count != null) {
-                // set the limits for the bin as an exact count when the count 
attribute arrives
-                this.maximumEntries = count;
-                this.minimumEntries = count;
-            }
-
             final String index = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
             if (index == null || index.isEmpty() || !binIndexSet.add(index)) {
                 // Do not accept flowfile with duplicate fragment index value
@@ -187,6 +192,14 @@ public class Bin {
                 successiveFailedOfferings++;
                 return false;
             }
+
+            final String countValue = 
flowFile.getAttribute(fileCountAttribute);
+            final Integer count = toInteger(countValue);
+            if (count != null) {
+                // set the limits for the bin as an exact count when the count 
attribute arrives
+                this.maximumEntries = count;
+                this.minimumEntries = count;
+            }
         }
 
         size += flowFile.getSize();
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
index 72ce099b1f..5e611d29fa 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 
 /**
  * This class is thread safe
@@ -45,6 +46,8 @@ public class BinManager {
     private final AtomicInteger minEntries = new AtomicInteger(0);
     private final AtomicInteger maxEntries = new 
AtomicInteger(Integer.MAX_VALUE);
     private final AtomicReference<String> fileCountAttribute = new 
AtomicReference<>(null);
+    private volatile Predicate<FlowFile> binTerminationCheck = ff -> false;
+    private volatile InsertionLocation insertionLocation = 
InsertionLocation.LAST_IN_BIN;
 
     private final AtomicInteger maxBinAgeSeconds = new 
AtomicInteger(Integer.MAX_VALUE);
     private final Map<String, List<Bin>> groupBinMap = new HashMap<>();
@@ -88,6 +91,16 @@ public class BinManager {
         this.maxEntries.set(maximumEntries);
     }
 
+    /**
+     * Sets the predicate that determines whether or not a FlowFile should 
terminate a bin and the location in the bin where the FlowFile should be 
inserted if so.
+     * @param binTerminationCheck the predicate to use to determine if a 
FlowFile should terminate a bin
+     * @param insertionLocation the location in the bin where the FlowFile 
should be inserted if it terminates the bin
+     */
+    public void setBinTermination(final Predicate<FlowFile> 
binTerminationCheck, final InsertionLocation insertionLocation) {
+        this.binTerminationCheck = binTerminationCheck;
+        this.insertionLocation = insertionLocation;
+    }
+
     public int getBinCount() {
         rLock.lock();
         try {
@@ -121,39 +134,8 @@ public class BinManager {
      * @return true if added; false if no bin exists which can fit this item 
and no bin can be created based on current min/max criteria
      */
     public boolean offer(final String groupIdentifier, final FlowFile 
flowFile, final ProcessSession session, final ProcessSessionFactory 
sessionFactory) {
-        final long currentMaxSizeBytes = maxSizeBytes.get();
-        if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any 
new bins (and probably none existing)
-            return false;
-        }
-        wLock.lock();
-        try {
-            final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
-            if (currentBins == null) { // this is a new group we need to 
register
-                final List<Bin> bins = new ArrayList<>();
-                final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                    maxEntries.get(), fileCountAttribute.get());
-                bins.add(bin);
-                groupBinMap.put(groupIdentifier, bins);
-                binCount++;
-                return bin.offer(flowFile, session);
-            } else {
-                for (final Bin bin : currentBins) {
-                    final boolean accepted = bin.offer(flowFile, session);
-                    if (accepted) {
-                        return true;
-                    }
-                }
-
-                //if we've reached this point then we couldn't fit it into any 
existing bins - gotta make a new one
-                final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                    maxEntries.get(), fileCountAttribute.get());
-                currentBins.add(bin);
-                binCount++;
-                return bin.offer(flowFile, session);
-            }
-        } finally {
-            wLock.unlock();
-        }
+        final Set<FlowFile> unbinned = offer(groupIdentifier, 
List.of(flowFile), session, sessionFactory);
+        return unbinned.isEmpty();
     }
 
     /**
@@ -179,11 +161,39 @@ public class BinManager {
                     continue;
                 }
 
+                final boolean terminatesBin = binTerminationCheck != null && 
binTerminationCheck.test(flowFile);
+
                 final List<Bin> currentBins = 
groupBinMap.computeIfAbsent(groupIdentifier, k -> new ArrayList<>());
-                for (final Bin bin : currentBins) {
-                    final boolean accepted = bin.offer(flowFile, session);
-                    if (accepted) {
-                        continue flowFileLoop;
+                if (terminatesBin) {
+                    if (insertionLocation == InsertionLocation.LAST_IN_BIN) {
+                        for (final Bin bin : currentBins) {
+                            final boolean accepted = bin.offer(flowFile, 
session);
+                            if (accepted) {
+                                bin.complete();
+                                
bin.setEvictionReason(EvictionReason.BIN_TERMINATION_SIGNAL);
+
+                                continue flowFileLoop;
+                            }
+                        }
+                    } else if (!currentBins.isEmpty()) {
+                        for (final Bin bin : currentBins) {
+                            if (bin.isForcefullyCompleted()) {
+                                continue;
+                            }
+
+                            bin.complete();
+                            
bin.setEvictionReason(EvictionReason.BIN_TERMINATION_SIGNAL);
+                            break;
+
+                            // Note that we intentionally do not continue the 
flowFileLoop here because we want to create a new bin
+                        }
+                    }
+                } else {
+                    for (final Bin bin : currentBins) {
+                        final boolean accepted = bin.offer(flowFile, session);
+                        if (accepted) {
+                            continue flowFileLoop;
+                        }
                     }
                 }
 
@@ -194,7 +204,12 @@ public class BinManager {
                 currentBins.add(bin);
                 binCount++;
                 final boolean added = bin.offer(flowFile, session);
-                if (!added) {
+                if (added) {
+                    if (terminatesBin && (insertionLocation == 
InsertionLocation.ISOLATED || insertionLocation == 
InsertionLocation.LAST_IN_BIN)) {
+                        bin.complete();
+                        
bin.setEvictionReason(EvictionReason.BIN_TERMINATION_SIGNAL);
+                    }
+                } else {
                     unbinned.add(flowFile);
                 }
 
@@ -223,10 +238,10 @@ public class BinManager {
                 final List<Bin> remainingBins = new ArrayList<>();
                 for (final Bin bin : group.getValue()) {
                     if (relaxFullnessConstraint && bin.isFullEnough()) {
-                        bin.setEvictionReason(bin.determineFullness());
+                        bin.setEvictionReason(bin.determineEvictionReason());
                         readyBins.add(bin);
                     } else if (!relaxFullnessConstraint && bin.isFull()) { 
//strict check
-                        bin.setEvictionReason(bin.determineFullness());
+                        bin.setEvictionReason(bin.determineEvictionReason());
                         readyBins.add(bin);
                     } else if (relaxFullnessConstraint && 
bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) {
                         bin.setEvictionReason(EvictionReason.TIMEOUT);
@@ -279,23 +294,4 @@ public class BinManager {
         }
     }
 
-    /**
-     * @return true if any current bins are older than the allowable max
-     */
-    public boolean containsOldBins() {
-        rLock.lock();
-        try {
-            for (final List<Bin> bins : groupBinMap.values()) {
-                for (final Bin bin : bins) {
-                    if (bin.isOlderThan(maxBinAgeSeconds.get(), 
TimeUnit.SECONDS)) {
-                        return true;
-                    }
-                }
-            }
-        } finally {
-            rLock.unlock();
-        }
-        return false;
-    }
-
 }
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
index ddafc20e66..e113891800 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
@@ -28,6 +28,8 @@ public enum EvictionReason {
 
     BIN_MANAGER_FULL("The oldest Bin was removed because incoming FlowFile 
could not be placed in an existing Bin, and the Maximum Number of Bins was 
reached"),
 
+    BIN_TERMINATION_SIGNAL("A FlowFile signaled that the Bin should be 
terminated"),
+
     UNSET("No reason was determined");
 
     private final String explanation;
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/InsertionLocation.java
similarity index 50%
copy from 
nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
copy to 
nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/InsertionLocation.java
index ddafc20e66..b820c08911 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/EvictionReason.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/InsertionLocation.java
@@ -17,29 +17,33 @@
 
 package org.apache.nifi.processor.util.bin;
 
-public enum EvictionReason {
-    MAX_BYTES_THRESHOLD_REACHED("Maximum number of bytes (Max Group Size) 
reached"),
+import org.apache.nifi.components.DescribedValue;
 
-    MAX_ENTRIES_THRESHOLD_REACHED("Maximum number of entries reached"),
+public enum InsertionLocation implements DescribedValue {
+    LAST_IN_BIN("Last in Bin", "Insert the FlowFile at the end of the Bin that 
is terminated"),
+    FIRST_IN_NEW_BIN("First in New Bin", "Insert the FlowFile at the beginning 
of a newly created Bin"),
+    ISOLATED("Isolated", "Insert the FlowFile into a new Bin and terminate the 
Bin immediately with the FlowFile as the only content");
 
-    MIN_THRESHOLDS_REACHED("Minimum number of bytes (Min Group Size) and 
minimum number of entries reached"),
+    private final String name;
+    private final String description;
 
-    TIMEOUT("Max Bin Age reached"),
-
-    BIN_MANAGER_FULL("The oldest Bin was removed because incoming FlowFile 
could not be placed in an existing Bin, and the Maximum Number of Bins was 
reached"),
-
-    UNSET("No reason was determined");
+    InsertionLocation(final String name, final String description) {
+        this.name = name;
+        this.description = description;
+    }
 
-    private final String explanation;
-    EvictionReason(final String explanation) {
-        this.explanation = explanation;
+    @Override
+    public String getValue() {
+        return name;
     }
 
-    public String getExplanation() {
-        return explanation;
+    @Override
+    public String getDisplayName() {
+        return name;
     }
 
-    public String toString() {
-        return explanation;
+    @Override
+    public String getDescription() {
+        return description;
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index c57576174f..34e25bbbf1 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -44,10 +44,12 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.documentation.UseCase;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.resource.ResourceCardinality;
 import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -62,6 +64,7 @@ import org.apache.nifi.processor.util.bin.Bin;
 import org.apache.nifi.processor.util.bin.BinFiles;
 import org.apache.nifi.processor.util.bin.BinManager;
 import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.InsertionLocation;
 import org.apache.nifi.processors.standard.merge.AttributeStrategy;
 import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
 import org.apache.nifi.stream.io.NonCloseableOutputStream;
@@ -95,6 +98,7 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipException;
@@ -395,6 +399,31 @@ public class MergeContent extends BinFiles {
         .dependsOn(MERGE_FORMAT, MERGE_FORMAT_TAR)
         .build();
 
+    public static final PropertyDescriptor BIN_TERMINATION_CHECK = new 
PropertyDescriptor.Builder()
+            .name("Bin Termination Check")
+        .description("""
+            Specifies an Expression Language Expression that is to be 
evaluated against each FlowFile. If the result of the expression is 'true', the
+            bin that the FlowFile corresponds to will be terminated, even if 
the bin has not met the minimum number of entries or minimum size.
+            Note that if the FlowFile that triggers the termination of the bin 
is itself larger than the Maximum Bin Size, it will be placed into its
+            own bin without triggering the termination of any other bin. When 
using this property, it is recommended to use Prioritizers in the flow's
+            connections to ensure that the ordering is as desired.
+            """)
+        .required(false)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.BOOLEAN,
 false))
+        .dependsOn(MERGE_STRATEGY, MERGE_STRATEGY_BIN_PACK)
+        .build();
+
+    public static final PropertyDescriptor FLOWFILE_INSERTION_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("FlowFile Insertion Strategy")
+        .description("If a given FlowFile terminates the bin based on the <Bin 
Termination Check> property, specifies where the FlowFile should be included in 
the bin.")
+        .required(true)
+        .dependsOn(BIN_TERMINATION_CHECK)
+        .defaultValue(InsertionLocation.LAST_IN_BIN)
+        .allowableValues(InsertionLocation.class)
+        .build();
+
+
     private static final List<PropertyDescriptor> PROPERTIES = List.of(
             MERGE_STRATEGY,
             MERGE_FORMAT,
@@ -405,6 +434,8 @@ public class MergeContent extends BinFiles {
             addBinPackingDependency(MAX_ENTRIES),
             addBinPackingDependency(MIN_SIZE),
             addBinPackingDependency(MAX_SIZE),
+            BIN_TERMINATION_CHECK,
+            FLOWFILE_INSERTION_STRATEGY,
             MAX_BIN_AGE,
             MAX_BIN_COUNT,
             DELIMITER_STRATEGY,
@@ -495,6 +526,13 @@ public class MergeContent extends BinFiles {
             binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         } else {
             binManager.setFileCountAttribute(null);
+
+            final PropertyValue terminationCheck = 
context.getProperty(BIN_TERMINATION_CHECK);
+            if (terminationCheck.isSet()) {
+                final InsertionLocation insertionLocation = 
context.getProperty(FLOWFILE_INSERTION_STRATEGY).asAllowableValue(InsertionLocation.class);
+                final Predicate<FlowFile> predicate = flowFile -> 
terminationCheck.evaluateAttributeExpressions(flowFile).asBoolean();
+                binManager.setBinTermination(predicate, insertionLocation);
+            }
         }
     }
 
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
index 2eb50f08ab..dc6a82cf8e 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.md
@@ -83,7 +83,7 @@ example, the user can specify the minimum number of FlowFiles 
that must be packa
 performed. The minimum number of bytes can also be configured. Additionally, a 
maximum number of FlowFiles and bytes may
 be specified.
 
-There are also two other conditions that will result in the contents of a Bin 
being merged together. The Max Bin Age
+There are two other conditions that will result in the contents of a Bin being 
merged together. The Max Bin Age
 property specifies the maximum amount of time that FlowFiles can be binned 
together before the bin is merged. This
 property should almost always be set, as it provides a means to set a timeout 
on a bin, so that even if data stops
 flowing to the Processor for a while (due to a problem with an upstream 
system, a source processor being stopped, etc.)
@@ -102,6 +102,25 @@ this value or the bin will never be complete. If all the 
necessary FlowFiles are
 which the bin times amount (as specified by the <Max Bin Age> property), then 
the FlowFiles will all be routed to the '
 failure' relationship instead of being merged together.
 
+Finally, a bin can be merged if the <Bin Termination Check> property is 
configured and a FlowFile is received that
+satisfies the specified condition. The condition is specified as an Expression 
Language expression. If any FlowFile
+result in the expression returning a value of true, then the bin will be 
merged, regardless of how much data is in
+the bin or how old the bin is. This incoming FlowFile that triggers the bin to 
be merged can either be added as the
+last entry in the bin, as the first entry in a new bin, or output as its own 
bin, depending on the value of the
+<FlowFile Insertion Strategy> property.
+
+A bin of FlowFiles, then, is merged when any one of the following conditions 
is met:
+- The bin has reached the maximum number of bytes, as configured by the <Max 
Group Size> property.
+- The bin has reached the maximum number of FlowFiles, as configured by the 
<Maximum Number of Entries> property.
+- The bin has reached both the minimum number of bytes, as configured by the 
<Min Group Size> property,
+  AND the minimum number of FlowFiles, as configured by the <Minimum Number of 
Entries> property.
+- The bin has reached the maximum age, as configured by the <Max Bin Age> 
property.
+- The maximum number of bins has been reached, as configured by the <Maximum 
number of Bins> property, and a new bin must be created.
+- The <Bin Termination Check> property is configured and a FlowFile is 
received that satisfies the specified condition.
+
+
+### Reason for Merge
+
 Whenever the contents of a Bin are merged, an attribute with the name 
"merge.reason" will be added to the merged
 FlowFile. The below table provides a listing of all possible values for this 
attribute with an explanation of each.
 
@@ -112,6 +131,7 @@ FlowFile. The below table provides a listing of all 
possible values for this att
 | MIN\_THRESHOLDS\_REACHED         | The bin has reached both the minimum 
number of bytes, as configured by the <Min Group Size> property, AND the 
minimum number of FlowFiles, as configured by the <Minimum Number of Entries> 
property. The bin has not reached the maximum number of bytes (Max Group Size) 
OR the maximum number of FlowFiles (Maximum Number of Entries).                 
                                                                                
                             [...]
 | TIMEOUT                          | The Bin has reached the maximum age, as 
configured by the <Max Bin Age> property. If this threshold is reached, the 
contents of the Bin will be merged together, even if the Bin has not yet 
reached either of the minimum thresholds. Note that the age here is determined 
by when the Bin was created, NOT the age of the FlowFiles that reside within 
those Bins. As a result, if the Processor is stopped until it has 1 million 
FlowFiles queued, each one being 1 [...]
 | BIN\_MANAGER\_FULL               | If an incoming FlowFile does not fit into 
any of the existing Bins (either due to the Maximum thresholds set, or due to 
the Correlation Attribute being used, etc.), then a new Bin must be created for 
the incoming FlowFiles. If the number of active Bins is already equal to the 
<Maximum number of Bins> property, the oldest Bin will be merged in order to 
make room for the new Bin. In that case, the Bin Manager is said to be full, 
and this value will be u [...]
+| BIN\_TERMINATION\_SIGNAL         | A FlowFile signaled that the Bin should 
be terminated by satisfying the configured <Bin Termination Check> property.    
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
 
 Note that the attribute value is minimally named, while the textual 
description is far more verbose. This is done for a
 few reasons. Firstly, storing a large value for the attribute can be more 
costly, utilizing more heap space and
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 079edc8aa4..e89c7cd95f 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.bin.InsertionLocation;
 import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
@@ -48,6 +49,7 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1338,6 +1340,147 @@ public class TestMergeContent {
         assertEquals(2, runner.getQueueSize().getObjectCount());
     }
 
+    @Test
+    public void testBinTerminationTriggerStartOfBin() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+        runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+        runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+        runner.setProperty(MergeContent.BIN_TERMINATION_CHECK, 
"${termination:equals('true')}");
+        runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY, 
InsertionLocation.FIRST_IN_NEW_BIN);
+
+        // Enqueue 5 FlowFiles, followed by a FlowFile with the 'termination' 
attribute set to 'true'.
+        // Do this 3 times.
+        int flowFileIndex = 0;
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 6; i++) {
+                final Map<String, String> attributes = (i == 5) ? 
Map.of("termination", "true") : Collections.emptyMap();
+                runner.enqueue((flowFileIndex++) + "\n", attributes);
+            }
+        }
+
+        runner.run(2);
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+
+        // We should get out 17 because the last FlowFile has not yet been 
transferred out, since it is the start of a new bin.
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 17);
+
+        final List<MockFlowFile> merged = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        merged.getFirst().assertContentEquals("0\n1\n2\n3\n4\n");
+        merged.get(1).assertContentEquals("5\n6\n7\n8\n9\n10\n");
+        merged.get(2).assertContentEquals("11\n12\n13\n14\n15\n16\n");
+    }
+
+    @Test
+    public void testBinTerminationTriggerEndOfBin() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+        runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+        runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+        runner.setProperty(MergeContent.BIN_TERMINATION_CHECK, 
"${termination:equals('true')}");
+        runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY, 
InsertionLocation.LAST_IN_BIN);
+
+        // Enqueue 5 FlowFiles, followed by a FlowFile with the 'termination' 
attribute set to 'true'.
+        // Do this 3 times.
+        int flowFileIndex = 0;
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 6; i++) {
+                final Map<String, String> attributes = (i == 5) ? 
Map.of("termination", "true") : Collections.emptyMap();
+                runner.enqueue((flowFileIndex++) + "\n", attributes);
+            }
+        }
+
+        runner.run(2);
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+
+        // We should get out 18 because the last FlowFile ended the bin and 
was transferred out.
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 18);
+
+        final List<MockFlowFile> merged = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        merged.getFirst().assertContentEquals("0\n1\n2\n3\n4\n5\n");
+        merged.get(1).assertContentEquals("6\n7\n8\n9\n10\n11\n");
+        merged.get(2).assertContentEquals("12\n13\n14\n15\n16\n17\n");
+    }
+
+    @Test
+    public void testBinTerminationTriggerIsolated() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+        runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+        runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+        runner.setProperty(MergeContent.BIN_TERMINATION_CHECK, 
"${termination:equals('true')}");
+        runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY, 
InsertionLocation.ISOLATED);
+
+        // Enqueue 5 FlowFiles, followed by a FlowFile with the 'termination' 
attribute set to 'true'.
+        // Do this 3 times.
+        int flowFileIndex = 0;
+        for (int j = 0; j < 3; j++) {
+            for (int i = 0; i < 6; i++) {
+                final Map<String, String> attributes = (i == 5) ? 
Map.of("termination", "true") : Collections.emptyMap();
+                runner.enqueue((flowFileIndex++) + "\n", attributes);
+            }
+        }
+
+        runner.run(2);
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 6);
+
+        // We should get out 18 because the last FlowFile ended the bin and 
was transferred out.
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 18);
+
+        final List<MockFlowFile> merged = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        merged.getFirst().assertContentEquals("0\n1\n2\n3\n4\n");
+        merged.get(1).assertContentEquals("5\n");
+        merged.get(2).assertContentEquals("6\n7\n8\n9\n10\n");
+        merged.get(3).assertContentEquals("11\n");
+        merged.get(4).assertContentEquals("12\n13\n14\n15\n16\n");
+        merged.get(5).assertContentEquals("17\n");
+    }
+
+    @Test
+    public void testTerminationTriggerWithCorrelationAttribute() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+        runner.setProperty(MergeContent.MAX_ENTRIES, "25");
+        runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+        runner.setProperty(MergeContent.BIN_TERMINATION_CHECK, 
"${termination:equals('true')}");
+        runner.setProperty(MergeContent.FLOWFILE_INSERTION_STRATEGY, 
InsertionLocation.FIRST_IN_NEW_BIN);
+        runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, 
"correlation");
+
+        // Enqueue FlowFiles. This should create 2 bins:
+        // '1' should get values 0, 1
+        // Then, '2' should get value 2
+        // Then, FlowFile with content '3' should be the start of a new bin
+        // Then, FlowFile with content '4' should be teh start of another new 
bin
+        // Then, we add 10 additional FlowFiles to bin 2 in order to fill it 
without a termination signal
+        // This should leave the FlowFile with content '4' in the bin, but not 
transferred out.
+        final Map<String, String> terminationAttributes = 
Map.of("termination", "true", "correlation", "1");
+        final Map<String, String> correlation1 = Map.of("correlation", "1");
+        final Map<String, String> correlation2 = Map.of("correlation", "2");
+
+        runner.enqueue("0\n", correlation1);
+        runner.enqueue("1\n", correlation1);
+        runner.enqueue("2\n", correlation2);
+        runner.enqueue("3\n", terminationAttributes);
+        runner.enqueue("4\n", terminationAttributes);
+
+        for (int i = 0; i < 10; i++) {
+            runner.enqueue(i + "\n", correlation2);
+        }
+
+        runner.run(2);
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 14);
+
+        final List<MockFlowFile> merged = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        merged.getFirst().assertContentEquals("0\n1\n");
+        merged.get(1).assertContentEquals("3\n");
+        merged.get(2).assertContentEquals("2\n0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n");
+    }
+
     private void createFlowFiles(final TestRunner testRunner) {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");


Reply via email to