This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch NIFI-6169-RC2 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2bba5094f2dd0d9e9b69576527b37dfe89bb2264 Author: Koji Kawamura <[email protected]> AuthorDate: Wed Mar 20 12:16:16 2019 +0900 NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on Added an unit test representing the fixed issue. And updated existing testDefragment test to illustrate the remaining FlowFiles those did not meet the threshold. --- .../nifi/processors/standard/TestMergeRecord.java | 63 ++++++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java index c54bf2a..3540b04 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java @@ -158,27 +158,39 @@ public class TestMergeRecord { final Map<String, String> attr1 = new HashMap<>(); attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); final Map<String, String> attr2 = new HashMap<>(); attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1"); final Map<String, String> attr3 = new HashMap<>(); attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2"); + attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); final Map<String, String> attr4 = new HashMap<>(); attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); - attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2"); + attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3"); + attr4.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); + + final Map<String, String> attr5 = new HashMap<>(); + attr5.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr5.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3"); + attr5.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1"); runner.enqueue("Name, Age\nJohn, 35", attr1); runner.enqueue("Name, Age\nJane, 34", attr2); - runner.enqueue("Name, Age\nJake, 3", attr3); - runner.enqueue("Name, Age\nJan, 2", attr4); + runner.enqueue("Name, Age\nJay, 24", attr3); + + runner.enqueue("Name, Age\nJake, 3", attr4); + runner.enqueue("Name, Age\nJan, 2", attr5); - runner.run(4); + runner.run(1); + assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount()); runner.assertTransferCount(MergeRecord.REL_MERGED, 2); runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4); @@ -196,6 +208,49 @@ public class TestMergeRecord { @Test + public void testDefragmentWithMultipleRecords() { + runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT); + + final Map<String, String> attr1 = new HashMap<>(); + attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); + attr1.put("record.count", "2"); + + final Map<String, String> attr2 = new HashMap<>(); + attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1"); + attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1"); + attr2.put("record.count", "2"); + + final Map<String, String> attr3 = new HashMap<>(); + attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2"); + attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2"); + attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0"); + attr3.put("record.count", "2"); + + runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1); + + runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2); + + runner.enqueue("Name, Age\nJay, 24\nJade, 28", attr3); + + runner.run(1); + + assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount()); + runner.assertTransferCount(MergeRecord.REL_MERGED, 1); + runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2); + + final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED); + assertEquals(1L, mffs.stream() + .filter(ff -> "4".equals(ff.getAttribute("record.count"))) + .filter(ff -> "header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new String(ff.toByteArray()))) + .count()); + + } + + + @Test public void testMinSize() { runner.setProperty(MergeRecord.MIN_RECORDS, "2"); runner.setProperty(MergeRecord.MAX_RECORDS, "2");
