tpalfy commented on code in PR #7254:
URL: https://github.com/apache/nifi/pull/7254#discussion_r1204097351
##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java:
##########
@@ -120,9 +120,9 @@
+ "\"fragment.identifier\" attribute and the same value for the
\"fragment.index\" attribute, the first FlowFile processed will be "
+ "accepted and subsequent FlowFiles will not be accepted into the
Bin."),
@ReadsAttribute(attribute = "fragment.count", description = "Applicable
only if the <Merge Strategy> property is set to Defragment. This "
- + "attribute must be present on all FlowFiles with the same value for
the fragment.identifier attribute. All FlowFiles in the same "
- + "bundle must have the same value for this attribute. The value of
this attribute indicates how many FlowFiles should be expected "
- + "in the given bundle."),
+ + "attribute indicates how many FlowFiles should be expected in the
given bundle. At least one FlowFile must have this attribute in "
+ + "the bundle and it may be the last one only. If multiple FlowFiles
contain the \"fragment.count\" attribute in a given bundle, "
+ + "all must have the same value."),
Review Comment:
```suggestion
+ "attribute indicates how many FlowFiles should be expected in the
given bundle. At least one FlowFile must have this attribute in "
+ "the bundle. If multiple FlowFiles contain the \"fragment.count\"
attribute in a given bundle, "
+ "all must have the same value."),
```
##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java:
##########
@@ -869,7 +869,36 @@ public void testDefragment() throws IOException {
}
@Test
- public void testDefragmentDuplicateFragement() throws IOException,
InterruptedException {
+ public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws
IOException {
Review Comment:
The fragment count doesn't need to be on the last fragment. I recommend
adding a test like this:
```java
@Test
public void testDefragmentWithFragmentCountOnMiddleFragment() throws
IOException {
final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final String fragmentId = "Fragment Id";
runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
}});
runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
}});
runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new
HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
}});
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
final MockFlowFile assembled =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
assembled.assertContentEquals("Fragment 1 without count Fragment 2
with count Fragment 3 without count".getBytes("UTF-8"));
}
```
##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java:
##########
@@ -869,7 +869,36 @@ public void testDefragment() throws IOException {
}
@Test
- public void testDefragmentDuplicateFragement() throws IOException,
InterruptedException {
+ public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
+ runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+ runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+ runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+ runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+ runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
+
+ attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
+ attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
+ runner.enqueue("Panama".getBytes("UTF-8"), attributes);
+
+ runner.run();
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+ final MockFlowFile assembled =
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+ assembled.assertContentEquals("A Man A Plan A Canal
Panama".getBytes("UTF-8"));
+ }
+
Review Comment:
We might want to add a test for different fragment counts like this:
```java
@Test
public void testDefragmentWithDifferentFragmentCounts() throws
IOException {
final TestRunner runner = TestRunners.newTestRunner(new
MergeContent());
runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final String fragmentId = "Fragment Id";
runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
}});
runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new
HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
}});
runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new
HashMap<String, String>() {{
put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
}});
runner.run();
runner.assertTransferCount(MergeContent.REL_MERGED, 0);
runner.assertTransferCount(MergeContent.REL_FAILURE, 3);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]