tpalfy commented on code in PR #7254:
URL: https://github.com/apache/nifi/pull/7254#discussion_r1204097351


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java:
##########
@@ -120,9 +120,9 @@
         + "\"fragment.identifier\" attribute and the same value for the 
\"fragment.index\" attribute, the first FlowFile processed will be "
         + "accepted and subsequent FlowFiles will not be accepted into the 
Bin."),
     @ReadsAttribute(attribute = "fragment.count", description = "Applicable 
only if the <Merge Strategy> property is set to Defragment. This "
-        + "attribute must be present on all FlowFiles with the same value for 
the fragment.identifier attribute. All FlowFiles in the same "
-        + "bundle must have the same value for this attribute. The value of 
this attribute indicates how many FlowFiles should be expected "
-        + "in the given bundle."),
+        + "attribute indicates how many FlowFiles should be expected in the 
given bundle. At least one FlowFile must have this attribute in "
+        + "the bundle and it may be the last one only. If multiple FlowFiles 
contain the \"fragment.count\" attribute in a given bundle, "
+        + "all must have the same value."),

Review Comment:
   ```suggestion
           + "attribute indicates how many FlowFiles should be expected in the 
given bundle. At least one FlowFile must have this attribute in "
           + "the bundle. If multiple FlowFiles contain the \"fragment.count\" 
attribute in a given bundle, "
           + "all must have the same value."),
   ```



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java:
##########
@@ -869,7 +869,36 @@ public void testDefragment() throws IOException {
     }
 
     @Test
-    public void testDefragmentDuplicateFragement() throws IOException, 
InterruptedException {
+    public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws 
IOException {

Review Comment:
   The fragment count doesn't need to be on the last fragment. I recommend 
adding a test like this:
   ```java
   @Test
       public void testDefragmentWithFragmentCountOnMiddleFragment() throws 
IOException {
           final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
           runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
           runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
   
           final String fragmentId = "Fragment Id";
           
           runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
               put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
               put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
           }});
           
           runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
               put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
               put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
               put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
           }});
           
           runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new 
HashMap<String, String>() {{
               put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
               put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
           }});
   
           runner.run();
   
           runner.assertTransferCount(MergeContent.REL_MERGED, 1);
           final MockFlowFile assembled = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
           assembled.assertContentEquals("Fragment 1 without count Fragment 2 
with count Fragment 3 without count".getBytes("UTF-8"));
       }
   ```



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java:
##########
@@ -869,7 +869,36 @@ public void testDefragment() throws IOException {
     }
 
     @Test
-    public void testDefragmentDuplicateFragement() throws IOException, 
InterruptedException {
+    public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
+
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
+        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
+        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
+
+        runner.run();
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        final MockFlowFile assembled = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        assembled.assertContentEquals("A Man A Plan A Canal 
Panama".getBytes("UTF-8"));
+    }
+

Review Comment:
   We might want to add a test for different fragment counts like this:
   ```java
       @Test
       public void testDefragmentWithDifferentFragmentCounts() throws 
IOException {
           final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
           runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
           runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
   
           final String fragmentId = "Fragment Id";
           
           runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
               put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
               put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
               put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
           }});
           
           runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), new 
HashMap<String, String>() {{
               put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
               put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
               put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
           }});
           
           runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), new 
HashMap<String, String>() {{
               put(MergeContent.FRAGMENT_ID_ATTRIBUTE, fragmentId);
               put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
           }});
   
           runner.run();
           
           runner.assertTransferCount(MergeContent.REL_MERGED, 0);
           runner.assertTransferCount(MergeContent.REL_FAILURE, 3);
           runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to