This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3bd4b49abe NIFI-11369: Fixed Defragment strategy with optional
fragment.count attribute in MergeContent
3bd4b49abe is described below
commit 3bd4b49abe51d656950072ddcbf24dace70266e4
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue May 16 22:11:39 2023 +0200
NIFI-11369: Fixed Defragment strategy with optional fragment.count
attribute in MergeContent
---
.../org/apache/nifi/processor/util/bin/Bin.java | 32 +++++---
.../nifi/processors/standard/MergeContent.java | 26 +++---
.../additionalDetails.html | 16 ++--
.../nifi/processors/standard/TestMergeContent.java | 96 +++++++++++++++++++++-
4 files changed, 140 insertions(+), 30 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
index 76175c99be..f702545e6d 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -44,8 +44,8 @@ public class Bin {
private final long minimumSizeBytes;
private final long maximumSizeBytes;
- private volatile int minimumEntries = 0;
- private volatile int maximumEntries = Integer.MAX_VALUE;
+ private volatile int minimumEntries;
+ private volatile int maximumEntries;
private final String fileCountAttribute;
private volatile EvictionReason evictionReason = EvictionReason.UNSET;
@@ -67,12 +67,22 @@ public class Bin {
*/
public Bin(final ProcessSession session, final long minSizeBytes, final
long maxSizeBytes, final int minEntries, final int maxEntries, final String
fileCountAttribute) {
this.session = session;
- this.minimumSizeBytes = minSizeBytes;
- this.maximumSizeBytes = maxSizeBytes;
- this.minimumEntries = minEntries;
- this.maximumEntries = maxEntries;
this.fileCountAttribute = fileCountAttribute;
+ if (this.fileCountAttribute != null ) {
+ // Merge Strategy = Defragment
+ // FlowFiles will be merged based on fragment.* attributes
+ this.minimumSizeBytes = 0;
+ this.maximumSizeBytes = Long.MAX_VALUE;
+ this.minimumEntries = Integer.MAX_VALUE;
+ this.maximumEntries = Integer.MAX_VALUE;
+ } else {
+ this.minimumSizeBytes = minSizeBytes;
+ this.maximumSizeBytes = maxSizeBytes;
+ this.minimumEntries = minEntries;
+ this.maximumEntries = maxEntries;
+ }
+
this.creationMomentEpochNs = System.nanoTime();
if (minSizeBytes > maxSizeBytes) {
throw new IllegalArgumentException();
@@ -164,16 +174,16 @@ public class Bin {
if (fileCountAttribute != null) {
final String countValue =
flowFile.getAttribute(fileCountAttribute);
final Integer count = toInteger(countValue);
- if (count == null) {
- return false;
+ if (count != null) {
+ // set the limits for the bin as an exact count when the count
attribute arrives
+ this.maximumEntries = count;
+ this.minimumEntries = count;
}
- this.maximumEntries = count;
- this.minimumEntries = count;
final String index =
flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
if (index == null || index.isEmpty() || !binIndexSet.add(index)) {
// Do not accept flowfile with duplicate fragment index value
- logger.warn("Duplicate or missing value for '" +
FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in
Bin", new Object[] { flowFile });
+ logger.warn("Duplicate or missing value for '" +
FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in
Bin", flowFile);
successiveFailedOfferings++;
return false;
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 4a1802ef4e..c0ce6ade8a 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -120,9 +120,8 @@ import java.util.zip.ZipOutputStream;
+ "\"fragment.identifier\" attribute and the same value for the
\"fragment.index\" attribute, the first FlowFile processed will be "
+ "accepted and subsequent FlowFiles will not be accepted into the
Bin."),
@ReadsAttribute(attribute = "fragment.count", description = "Applicable
only if the <Merge Strategy> property is set to Defragment. This "
- + "attribute must be present on all FlowFiles with the same value for
the fragment.identifier attribute. All FlowFiles in the same "
- + "bundle must have the same value for this attribute. The value of
this attribute indicates how many FlowFiles should be expected "
- + "in the given bundle."),
+ + "attribute indicates how many FlowFiles should be expected in the
given bundle. At least one FlowFile must have this attribute in "
+ + "the bundle. If multiple FlowFiles contain the \"fragment.count\"
attribute in a given bundle, all must have the same value."),
@ReadsAttribute(attribute = "segment.original.filename", description =
"Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "This attribute must be present on all FlowFiles with the same value
for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of
this attribute will be used for the filename of the completed merged "
@@ -572,16 +571,23 @@ public class MergeContent extends BinFiles {
fragmentIdentifier = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
final String fragmentCount =
flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE);
- if (!isNumber(fragmentCount)) {
- return "Cannot Defragment " + flowFile + " because it does not
have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
- } else if (decidedFragmentCount == null) {
- decidedFragmentCount = fragmentCount;
- } else if (!decidedFragmentCount.equals(fragmentCount)) {
- return "Cannot Defragment " + flowFile + " because it is
grouped with another FlowFile, and the two have differing values for the "
- + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " +
decidedFragmentCount + " and " + fragmentCount;
+ if (fragmentCount != null) {
+ if (!isNumber(fragmentCount)) {
+ return "Cannot Defragment " + flowFile + " because it does
not have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
+ } else if (decidedFragmentCount == null) {
+ decidedFragmentCount = fragmentCount;
+ } else if (!decidedFragmentCount.equals(fragmentCount)) {
+ return "Cannot Defragment " + flowFile + " because it is
grouped with another FlowFile, and the two have differing values for the "
+ + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " +
decidedFragmentCount + " and " + fragmentCount;
+ }
}
}
+ if (decidedFragmentCount == null) {
+ return "Cannot Defragment FlowFiles with Fragment Identifier " +
fragmentIdentifier + " because no FlowFile arrived with the " +
FRAGMENT_COUNT_ATTRIBUTE + " attribute "
+ + "and the expected number of fragments is unknown";
+ }
+
final int numericFragmentCount;
try {
numericFragmentCount = Integer.parseInt(decidedFragmentCount);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
index de6ab8459b..6582620f33 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
@@ -58,9 +58,11 @@
The "Defragment" Merge Strategy can be used when FlowFiles need to be
explicitly assigned to the same bin. For example, if data is split apart using
the UnpackContent Processor, each unpacked FlowFile can be processed
independently and later merged back together using this Processor with the
Merge Strategy set to Defragment. In order for FlowFiles to be added to
the same bin when using this configuration, the FlowFiles must have the same
- value for the "fragment.identifier" attribute. Each FlowFile with the same
identifier must also have the same value for the "fragment.count" attribute
- (which indicates how many FlowFiles belong in the bin) and a unique value
for the "fragment.index" attribute so that the FlowFiles can be ordered
- correctly. <b>NOTE:</b> while there are valid use cases for breaking apart
FlowFiles and later re-merging them, it is an anti-pattern to take a larger
FlowFile,
+ value for the "fragment.identifier" attribute. Each FlowFile with the same
identifier must also have a unique value for the "fragment.index" attribute
+ so that the FlowFiles can be ordered correctly. For a given
"fragment.identifier", at least one FlowFile must have the "fragment.count"
attribute
+ (which indicates how many FlowFiles belong in the bin). Other FlowFiles
with the same identifier must have the same value for the "fragment.count"
attribute,
+ or they can omit this attribute.
+ <b>NOTE:</b> while there are valid use cases for breaking apart FlowFiles
and later re-merging them, it is an anti-pattern to take a larger FlowFile,
break it into a million tiny FlowFiles, and then re-merge them. Doing so
can result in using huge amounts of Java heap and can result in Out Of Memory
Errors.
Additionally, it adds large amounts of load to the NiFi framework. This
can result in increased CPU and disk utilization and often times can be an
order of magnitude
lower throughput and an order of magnitude higher latency. As an
alternative, whenever possible, dataflows should be built to make use of
Record-oriented processors,
@@ -70,7 +72,7 @@
<p>
In order to be added to the same bin, two FlowFiles must be 'like
FlowFiles.' In order for two FlowFiles to be like FlowFiles, they must have the
same
schema, and if the <Correlation Attribute Name> property is set,
they must have the same value for the specified attribute. For example, if the
- <Correlation Attribute Name> is set to "filename" then two FlowFiles
must have the same value for the "filename" attribute in order to be binned
+ <Correlation Attribute Name> is set to "filename", then two
FlowFiles must have the same value for the "filename" attribute in order to be
binned
together. If more than one attribute is needed in order to correlate two
FlowFiles, it is recommended to use an UpdateAttribute processor before the
MergeContent processor and combine the attributes. For example, if the
goal is to bin together two FlowFiles only if they have the same value for the
"abc" attribute and the "xyz" attribute, then we could accomplish this by
using UpdateAttribute and adding a property with name "correlation.attribute"
@@ -87,7 +89,7 @@
</p>
<p>
- If the <Merge Strategy> property is set to "Bin Packing Algorithm"
then then the following rules will be evaluated.
+ If the <Merge Strategy> property is set to "Bin Packing Algorithm",
then the following rules will be evaluated.
</p>
<p>
@@ -107,7 +109,7 @@
</p>
<p>
- If the <Merge Strategy> property is set to "Defragment" then a bin
is full only when the number of FlowFiles in the bin is equal to the number
specified
+ If the <Merge Strategy> property is set to "Defragment", then a bin
is full only when the number of FlowFiles in the bin is equal to the number
specified
by the "fragment.count" attribute of one of the FlowFiles in the bin. All
FlowFiles that have this attribute must have the same value for this attribute,
or else they will be routed to the "failure" relationship. It is not
necessary that all FlowFiles have this value, but at least one FlowFile in the
bin must have
this value or the bin will never be complete. If all of the necessary
FlowFiles are not binned together by the point at which the bin times amount
@@ -152,7 +154,7 @@
</tr>
<tr>
<td>BIN_MANAGER_FULL</td>
- <td>If an incoming FlowFile does not fit into any of the existing Bins
(either due to the Maximum thresholds set, or due to the Correlation Attribute
being used, etc.) then a new Bin
+ <td>If an incoming FlowFile does not fit into any of the existing Bins
(either due to the Maximum thresholds set, or due to the Correlation Attribute
being used, etc.), then a new Bin
must be created for the incoming FlowFiles. If the number of
active Bins is already equal to the <Maximum number of Bins> property,
the oldest Bin will be merged in order to
make room for the new Bin. In that case, the Bin Manager is said
to be full, and this value will be used.</td>
</tr>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 3bfa55ff15..4b2984b6b4 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -869,7 +869,99 @@ public class TestMergeContent {
}
@Test
- public void testDefragmentDuplicateFragement() throws IOException,
InterruptedException {
+ public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+ runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+ runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+ runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+ runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
+ attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
+ runner.enqueue("Panama".getBytes("UTF-8"), attributes);
+
+ runner.run();
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+ final MockFlowFile assembled =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+ assembled.assertContentEquals("A Man A Plan A Canal
Panama".getBytes("UTF-8"));
+ }
+
+ @Test
+ public void testDefragmentWithFragmentCountOnMiddleFragment() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+ runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+ final String fragmentId = "Fragment Id";
+
+ runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
+ put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+ put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+ }});
+
+ runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
+ put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+ put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+ put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
+ }});
+
+ runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new
HashMap<String, String>() {{
+ put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+ put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+ }});
+
+ runner.run();
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+ final MockFlowFile assembled =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+ assembled.assertContentEquals("Fragment 1 without count Fragment 2
with count Fragment 3 without count".getBytes("UTF-8"));
+ }
+
+ @Test
+ public void testDefragmentWithDifferentFragmentCounts() throws IOException
{
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+ runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+ final String fragmentId = "Fragment Id";
+
+ runner.enqueue("Fragment 1 with count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
+ put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+ put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+ put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
+ }});
+
+ runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
+ put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+ put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+ put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
+ }});
+
+ runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new
HashMap<String, String>() {{
+ put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
+ put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+ }});
+
+ runner.run();
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 0);
+ runner.assertTransferCount(MergeContent.REL_FAILURE, 3);
+ runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
+ }
+
+ @Test
+ public void testDefragmentDuplicateFragment() throws IOException,
InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
@@ -904,7 +996,7 @@ public class TestMergeContent {
}
@Test
- public void testDefragmentWithTooManyFragements() throws IOException {
+ public void testDefragmentWithTooManyFragments() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_ENTRIES, "3");