This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bd4b49abe NIFI-11369: Fixed Defragment strategy with optional 
fragment.count attribute in MergeContent
3bd4b49abe is described below

commit 3bd4b49abe51d656950072ddcbf24dace70266e4
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue May 16 22:11:39 2023 +0200

    NIFI-11369: Fixed Defragment strategy with optional fragment.count 
attribute in MergeContent
---
 .../org/apache/nifi/processor/util/bin/Bin.java    | 32 +++++---
 .../nifi/processors/standard/MergeContent.java     | 26 +++---
 .../additionalDetails.html                         | 16 ++--
 .../nifi/processors/standard/TestMergeContent.java | 96 +++++++++++++++++++++-
 4 files changed, 140 insertions(+), 30 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
index 76175c99be..f702545e6d 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -44,8 +44,8 @@ public class Bin {
     private final long minimumSizeBytes;
     private final long maximumSizeBytes;
 
-    private volatile int minimumEntries = 0;
-    private volatile int maximumEntries = Integer.MAX_VALUE;
+    private volatile int minimumEntries;
+    private volatile int maximumEntries;
     private final String fileCountAttribute;
     private volatile EvictionReason evictionReason = EvictionReason.UNSET;
 
@@ -67,12 +67,22 @@ public class Bin {
      */
     public Bin(final ProcessSession session, final long minSizeBytes, final 
long maxSizeBytes, final int minEntries, final int maxEntries, final String 
fileCountAttribute) {
         this.session = session;
-        this.minimumSizeBytes = minSizeBytes;
-        this.maximumSizeBytes = maxSizeBytes;
-        this.minimumEntries = minEntries;
-        this.maximumEntries = maxEntries;
         this.fileCountAttribute = fileCountAttribute;
 
+        if (this.fileCountAttribute != null ) {
+            // Merge Strategy = Defragment
+            // FlowFiles will be merged based on fragment.* attributes
+            this.minimumSizeBytes = 0;
+            this.maximumSizeBytes = Long.MAX_VALUE;
+            this.minimumEntries = Integer.MAX_VALUE;
+            this.maximumEntries = Integer.MAX_VALUE;
+        } else {
+            this.minimumSizeBytes = minSizeBytes;
+            this.maximumSizeBytes = maxSizeBytes;
+            this.minimumEntries = minEntries;
+            this.maximumEntries = maxEntries;
+        }
+
         this.creationMomentEpochNs = System.nanoTime();
         if (minSizeBytes > maxSizeBytes) {
             throw new IllegalArgumentException();
@@ -164,16 +174,16 @@ public class Bin {
         if (fileCountAttribute != null) {
             final String countValue = 
flowFile.getAttribute(fileCountAttribute);
             final Integer count = toInteger(countValue);
-            if (count == null) {
-                return false;
+            if (count != null) {
+                // set the limits for the bin as an exact count when the count 
attribute arrives
+                this.maximumEntries = count;
+                this.minimumEntries = count;
             }
-            this.maximumEntries = count;
-            this.minimumEntries = count;
 
             final String index = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
             if (index == null || index.isEmpty() || !binIndexSet.add(index)) {
                 // Do not accept flowfile with duplicate fragment index value
-                logger.warn("Duplicate or missing value for '" + 
FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in 
Bin", new Object[] { flowFile });
+                logger.warn("Duplicate or missing value for '" + 
FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in 
Bin", flowFile);
                 successiveFailedOfferings++;
                 return false;
             }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 4a1802ef4e..c0ce6ade8a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -120,9 +120,8 @@ import java.util.zip.ZipOutputStream;
         + "\"fragment.identifier\" attribute and the same value for the 
\"fragment.index\" attribute, the first FlowFile processed will be "
         + "accepted and subsequent FlowFiles will not be accepted into the 
Bin."),
     @ReadsAttribute(attribute = "fragment.count", description = "Applicable 
only if the <Merge Strategy> property is set to Defragment. This "
-        + "attribute must be present on all FlowFiles with the same value for 
the fragment.identifier attribute. All FlowFiles in the same "
-        + "bundle must have the same value for this attribute. The value of 
this attribute indicates how many FlowFiles should be expected "
-        + "in the given bundle."),
+        + "attribute indicates how many FlowFiles should be expected in the 
given bundle. At least one FlowFile must have this attribute in "
+        + "the bundle. If multiple FlowFiles contain the \"fragment.count\" 
attribute in a given bundle, all must have the same value."),
     @ReadsAttribute(attribute = "segment.original.filename", description = 
"Applicable only if the <Merge Strategy> property is set to Defragment. "
         + "This attribute must be present on all FlowFiles with the same value 
for the fragment.identifier attribute. All FlowFiles in the same "
         + "bundle must have the same value for this attribute. The value of 
this attribute will be used for the filename of the completed merged "
@@ -572,16 +571,23 @@ public class MergeContent extends BinFiles {
             fragmentIdentifier = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
 
             final String fragmentCount = 
flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE);
-            if (!isNumber(fragmentCount)) {
-                return "Cannot Defragment " + flowFile + " because it does not 
have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
-            } else if (decidedFragmentCount == null) {
-                decidedFragmentCount = fragmentCount;
-            } else if (!decidedFragmentCount.equals(fragmentCount)) {
-                return "Cannot Defragment " + flowFile + " because it is 
grouped with another FlowFile, and the two have differing values for the "
-                        + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + 
decidedFragmentCount + " and " + fragmentCount;
+            if (fragmentCount != null) {
+                if (!isNumber(fragmentCount)) {
+                    return "Cannot Defragment " + flowFile + " because it does 
not have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
+                } else if (decidedFragmentCount == null) {
+                    decidedFragmentCount = fragmentCount;
+                } else if (!decidedFragmentCount.equals(fragmentCount)) {
+                    return "Cannot Defragment " + flowFile + " because it is 
grouped with another FlowFile, and the two have differing values for the "
+                            + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + 
decidedFragmentCount + " and " + fragmentCount;
+                }
             }
         }
 
+        if (decidedFragmentCount == null) {
+            return "Cannot Defragment FlowFiles with Fragment Identifier " + 
fragmentIdentifier + " because no FlowFile arrived with the " + 
FRAGMENT_COUNT_ATTRIBUTE + " attribute "
+                + "and the expected number of fragments is unknown";
+        }
+
         final int numericFragmentCount;
         try {
             numericFragmentCount = Integer.parseInt(decidedFragmentCount);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
index de6ab8459b..6582620f33 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
@@ -58,9 +58,11 @@
     The "Defragment" Merge Strategy can be used when FlowFiles need to be 
explicitly assigned to the same bin. For example, if data is split apart using
     the UnpackContent Processor, each unpacked FlowFile can be processed 
independently and later merged back together using this Processor with the
     Merge Strategy set to Defragment. In order for FlowFiles to be added to 
the same bin when using this configuration, the FlowFiles must have the same
-    value for the "fragment.identifier" attribute. Each FlowFile with the same 
identifier must also have the same value for the "fragment.count" attribute
-    (which indicates how many FlowFiles belong in the bin) and a unique value 
for the "fragment.index" attribute so that the FlowFiles can be ordered
-    correctly. <b>NOTE:</b> while there are valid use cases for breaking apart 
FlowFiles and later re-merging them, it is an anti-pattern to take a larger 
FlowFile,
+    value for the "fragment.identifier" attribute. Each FlowFile with the same 
identifier must also have a unique value for the "fragment.index" attribute
+    so that the FlowFiles can be ordered correctly. For a given 
"fragment.identifier", at least one FlowFile must have the "fragment.count" 
attribute
+    (which indicates how many FlowFiles belong in the bin). Other FlowFiles 
with the same identifier must have the same value for the "fragment.count" 
attribute,
+    or they can omit this attribute.
+    <b>NOTE:</b> while there are valid use cases for breaking apart FlowFiles 
and later re-merging them, it is an anti-pattern to take a larger FlowFile,
     break it into a million tiny FlowFiles, and then re-merge them. Doing so 
can result in using huge amounts of Java heap and can result in Out Of Memory 
Errors.
     Additionally, it adds large amounts of load to the NiFi framework. This 
can result in increased CPU and disk utilization and often times can be an 
order of magnitude
     lower throughput and an order of magnitude higher latency. As an 
alternative, whenever possible, dataflows should be built to make use of 
Record-oriented processors,
@@ -70,7 +72,7 @@
 <p>
     In order to be added to the same bin, two FlowFiles must be 'like 
FlowFiles.' In order for two FlowFiles to be like FlowFiles, they must have the 
same
     schema, and if the &lt;Correlation Attribute Name&gt; property is set, 
they must have the same value for the specified attribute. For example, if the
-    &lt;Correlation Attribute Name&gt; is set to "filename" then two FlowFiles 
must have the same value for the "filename" attribute in order to be binned
+    &lt;Correlation Attribute Name&gt; is set to "filename", then two 
FlowFiles must have the same value for the "filename" attribute in order to be 
binned
     together. If more than one attribute is needed in order to correlate two 
FlowFiles, it is recommended to use an UpdateAttribute processor before the
     MergeContent processor and combine the attributes. For example, if the 
goal is to bin together two FlowFiles only if they have the same value for the
     "abc" attribute and the "xyz" attribute, then we could accomplish this by 
using UpdateAttribute and adding a property with name "correlation.attribute"
@@ -87,7 +89,7 @@
 </p>
 
 <p>
-    If the &lt;Merge Strategy&gt; property is set to "Bin Packing Algorithm" 
then then the following rules will be evaluated.
+    If the &lt;Merge Strategy&gt; property is set to "Bin Packing Algorithm", 
then the following rules will be evaluated.
 </p>
 
 <p>
@@ -107,7 +109,7 @@
 </p>
 
 <p>
-    If the &lt;Merge Strategy&gt; property is set to "Defragment" then a bin 
is full only when the number of FlowFiles in the bin is equal to the number 
specified
+    If the &lt;Merge Strategy&gt; property is set to "Defragment", then a bin 
is full only when the number of FlowFiles in the bin is equal to the number 
specified
     by the "fragment.count" attribute of one of the FlowFiles in the bin. All 
FlowFiles that have this attribute must have the same value for this attribute,
     or else they will be routed to the "failure" relationship. It is not 
necessary that all FlowFiles have this value, but at least one FlowFile in the 
bin must have
     this value or the bin will never be complete. If all of the necessary 
FlowFiles are not binned together by the point at which the bin times amount
@@ -152,7 +154,7 @@
     </tr>
     <tr>
         <td>BIN_MANAGER_FULL</td>
-        <td>If an incoming FlowFile does not fit into any of the existing Bins 
(either due to the Maximum thresholds set, or due to the Correlation Attribute 
being used, etc.) then a new Bin
+        <td>If an incoming FlowFile does not fit into any of the existing Bins 
(either due to the Maximum thresholds set, or due to the Correlation Attribute 
being used, etc.), then a new Bin
             must be created for the incoming FlowFiles. If the number of 
active Bins is already equal to the &lt;Maximum number of Bins&gt; property, 
the oldest Bin will be merged in order to
             make room for the new Bin. In that case, the Bin Manager is said 
to be full, and this value will be used.</td>
     </tr>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 3bfa55ff15..4b2984b6b4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -869,7 +869,99 @@ public class TestMergeContent {
     }
 
     @Test
-    public void testDefragmentDuplicateFragement() throws IOException, 
InterruptedException {
+    public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
+        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
+        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
+
+        runner.run();
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        final MockFlowFile assembled = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        assembled.assertContentEquals("A Man A Plan A Canal 
Panama".getBytes("UTF-8"));
+    }
+
+    @Test
+    public void testDefragmentWithFragmentCountOnMiddleFragment() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+        final String fragmentId = "Fragment Id";
+
+        runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
+            put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+            put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+        }});
+
+        runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
+            put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+            put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+            put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
+        }});
+
+        runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new 
HashMap<String, String>() {{
+            put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+            put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+        }});
+
+        runner.run();
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        final MockFlowFile assembled = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        assembled.assertContentEquals("Fragment 1 without count Fragment 2 
with count Fragment 3 without count".getBytes("UTF-8"));
+    }
+
+    @Test
+    public void testDefragmentWithDifferentFragmentCounts() throws IOException 
{
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+        final String fragmentId = "Fragment Id";
+
+        runner.enqueue("Fragment 1 with count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
+            put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+            put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+            put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        }});
+
+        runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
+            put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+            put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+            put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
+        }});
+
+        runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new 
HashMap<String, String>() {{
+            put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+            put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+        }});
+
+        runner.run();
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 0);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 3);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
+    }
+
+    @Test
+    public void testDefragmentDuplicateFragment() throws IOException, 
InterruptedException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
         runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
         runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
@@ -904,7 +996,7 @@ public class TestMergeContent {
     }
 
     @Test
-    public void testDefragmentWithTooManyFragements() throws IOException {
+    public void testDefragmentWithTooManyFragments() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
         runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
         runner.setProperty(MergeContent.MAX_ENTRIES, "3");

Reply via email to