Repository: nifi
Updated Branches:
  refs/heads/master eefad2916 -> b603cb955


http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
new file mode 100644
index 0000000..16c05ac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.Optional;
+
+public class RecordBinThresholds {
+    private final int minRecords;
+    private final int maxRecords;
+    private final long minBytes;
+    private final long maxBytes;
+    private final long maxBinMillis;
+    private final String maxBinAge;
+    private final Optional<String> recordCountAttribute;
+
+    public RecordBinThresholds(final int minRecords, final int maxRecords, 
final long minBytes, final long maxBytes, final long maxBinMillis,
+        final String maxBinAge, final String recordCountAttribute) {
+        this.minRecords = minRecords;
+        this.maxRecords = maxRecords;
+        this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
+        this.maxBinMillis = maxBinMillis;
+        this.maxBinAge = maxBinAge;
+        this.recordCountAttribute = Optional.ofNullable(recordCountAttribute);
+    }
+
+    public int getMinRecords() {
+        return minRecords;
+    }
+
+    public int getMaxRecords() {
+        return maxRecords;
+    }
+
+    public long getMinBytes() {
+        return minBytes;
+    }
+
+    public long getMaxBytes() {
+        return maxBytes;
+    }
+
+    public long getMaxBinMillis() {
+        return maxBinMillis;
+    }
+
+    public String getMaxBinAge() {
+        return maxBinAge;
+    }
+
+    public Optional<String> getRecordCountAttribute() {
+        return recordCountAttribute;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 09681da..aa8e931 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -62,6 +62,7 @@ org.apache.nifi.processors.standard.LogMessage
 org.apache.nifi.processors.standard.LookupAttribute
 org.apache.nifi.processors.standard.LookupRecord
 org.apache.nifi.processors.standard.MergeContent
+org.apache.nifi.processors.standard.MergeRecord
 org.apache.nifi.processors.standard.ModifyBytes
 org.apache.nifi.processors.standard.MonitorActivity
 org.apache.nifi.processors.standard.Notify

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
new file mode 100644
index 0000000..ebd8856
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
@@ -0,0 +1,229 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>MergeRecord</title>
+
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+
+        <h3>Introduction</h3>
+       <p>
+           The MergeRecord Processor allows the user to take many FlowFiles 
that consist of record-oriented data (any data format for which there is
+           a Record Reader available) and combine the FlowFiles into one 
larger FlowFile. This may be preferable before pushing the data to a downstream
+           system that prefers larger batches of data, such as HDFS, or in 
order to improve performance of a NiFi flow by reducing the number of FlowFiles
+           that flow through the system (thereby reducing the contention 
placed on the FlowFile Repository, Provenance Repository, Content Repository, 
and
+           FlowFile Queues).
+       </p>
+
+       <p>
+               The Processor creates several 'bins' to put the FlowFiles in. 
The maximum number of bins to use is set to 5 by default, but this can be 
changed
+               by updating the value of the &lt;Maximum number of Bins&gt; 
property. The number of bins is bound in order to avoid running out of Java 
heap space.
+               Note: while the contents of a FlowFile are stored in the 
Content Repository and not in the Java heap space, the Processor must hold the 
FlowFile
+               objects themselves in memory. As a result, these FlowFiles with 
their attributes can potentially take up a great deal of heap space and cause
+               OutOfMemoryError's to be thrown. In order to avoid this, if you 
expect to merge many small FlowFiles together, it is advisable to instead use a
+               MergeContent that merges no more than say 1,000 FlowFiles into 
a bundle and then use a second MergeContent to merges these small bundles into
+               larger bundles. For example, to merge 1,000,000 FlowFiles 
together, use MergeRecord that uses a &lt;Maximum Number of Records&gt; of 
1,000 and route the
+               "merged" Relationship to a second MergeRecord that also sets 
the &lt;Maximum Number of Records&gt; to 1,000. The second MergeRecord will 
then merge 1,000 bundles
+               of 1,000, which in effect produces bundles of 1,000,000.
+       </p>
+
+
+
+       <h3>How FlowFiles are Binned</h3>
+       <p>
+           How the Processor determines which bin to place a FlowFile in 
depends on a few different configuration options. Firstly, the Merge Strategy
+           is considered. The Merge Strategy can be set to one of two options: 
Bin Packing Algorithm, or Defragment. When the goal is to simply combine
+           smaller FlowFiles into one larger FlowFiles, the Bin Packing 
Algorithm should be used. This algorithm picks a bin based on whether or not 
the FlowFile
+           can fit in the bin according to its size and the &lt;Maximum Bin 
Size&gt; property and whether or not the FlowFile is 'like' the other FlowFiles 
in
+           the bin. What it means for two FlowFiles to be 'like FlowFiles' is 
discussed at the end of this section.
+       </p>
+       
+       <p>
+           The "Defragment" Merge Strategy can be used when records need to be 
explicitly assigned to the same bin. For example, if data is split apart using
+           the SplitRecord Processor, each 'split' can be processed 
independently and later merged back together using this Processor with the
+           Merge Strategy set to Defragment. In order for FlowFiles to be 
added to the same bin when using this configuration, the FlowFiles must have 
the same
+           value for the "fragment.identifier" attribute. Each FlowFile with 
the same identifier must also have the same value for the "fragment.count" 
attribute
+           (which indicates how many FlowFiles belong in the bin) and a unique 
value for the "fragment.index" attribute so that the FlowFiles can be ordered
+           correctly.
+       </p>
+       
+       <p>
+           In order to be added to the same bin, two FlowFiles must be 'like 
FlowFiles.' In order for two FlowFiles to be like FlowFiles, they must have the 
same
+           schema, and if the &lt;Correlation Attribute Name&gt; property is 
set, they must have the same value for the specified attribute. For example, if 
the
+           &lt;Correlation Attribute Name&gt; is set to "filename" then two 
FlowFiles must have the same value for the "filename" attribute in order to be 
binned
+           together. If more than one attribute is needed in order to 
correlate two FlowFiles, it is recommended to use an UpdateAttribute processor 
before the
+           MergeRecord processor and combine the attributes. For example, if 
the goal is to bin together two FlowFiles only if they have the same value for 
the
+           "abc" attribute and the "xyz" attribute, then we could accomplish 
this by using UpdateAttribute and adding a property with name 
"correlation.attribute"
+           and a value of "abc=${abc},xyz=${xyz}" and then setting 
MergeRecord's &lt;Correlation Attribute Name&gt; property to 
"correlation.attribute".
+       </p>
+       
+       <p>
+               It is often useful to bin together only Records that have the 
same value for some field. For example, if we have point-of-sale data, perhaps 
the desire
+               is to bin together records that belong to the same store, as 
identified by the 'storeId' field. This can be accomplished by making use of 
the PartitionRecord
+               Processor ahead of MergeRecord. This Processor will allow one 
or more fields to be configured as the partitioning criteria and will create 
attributes for those
+               corresponding values. An UpdateAttribute processor could then 
be used, if necessary, to combine multiple attributes into a single correlation 
attribute,
+               as described above. See documentation for those processors for 
more details.
+       </p>
+
+
+
+               <h3>When a Bin is Merged</h3>           
+       <p>
+           Above, we discussed how a bin is chosen for a given FlowFile. Once 
a bin has been created and FlowFiles added to it, we must have some way to 
determine
+           when a bin is "full" so that we can bin those FlowFiles together 
into a "merged" FlowFile. There are a few criteria that are used to make a 
determination as
+           to whether or not a bin should be merged.
+       </p>
+
+               <p>
+                   If the &lt;Merge Strategy&gt; property is set to "Bin 
Packing Algorithm" then then the following rules will be evaluated.
+                   Firstly, in order for a bin to be full, both of the 
thresholds specified by the &lt;Minimum Bin Size&gt; and the &lt;Minimum Number 
of Records&gt; properties
+                   must be satisfied. If one of these properties is not set, 
then it is ignored. Secondly, if either the &lt;Maximum Bin Size&gt; or the 
&lt;Maximum Number of
+                   Records&gt; property is reached, then the bin is merged. 
That is, both of the minimum values must be reached but only one of the maximum 
values need be reached.
+                   Note that the &lt;Maximum Number of Records&gt; property is 
a "soft limit," meaning that all records in a given input FlowFile will be 
added to the same bin, and
+                   as a result the number of records may exceed the maximum 
configured number of records. Once this happens, though, no more Records will 
be added to that same bin
+                   from another FlowFile.
+                   If the &lt;Max Bin Age&gt; is reached for a bin, then the 
FlowFiles in that bin will be merged, <b>even if</b> the minimum bin size and 
minimum number of records
+                   have not yet been met. Finally, if the maximum number of 
bins have been created (as specified by the &lt;Maximum number of Bins&gt; 
property), and some input FlowFiles
+                   cannot fit into any of the existing bins, then the oldest 
bin will be merged to make room. This is done because otherwise we would not be 
able to add any
+                   additional FlowFiles to the existing bins and would have to 
wait until the Max Bin Age is reached (if ever) in order to merge any FlowFiles.
+               </p>
+
+        <p>
+            If the &lt;Merge Strategy&gt; property is set to "Defragment" then 
a bin is full only when the number of FlowFiles in the bin is equal to the 
number specified
+            by the "fragment.count" attribute of one of the FlowFiles in the 
bin. All FlowFiles that have this attribute must have the same value for this 
attribute,
+            or else they will be routed to the "failure" relationship. It is 
not necessary that all FlowFiles have this value, but at least one FlowFile in 
the bin must have
+            this value or the bin will never be complete. If all of the 
necessary FlowFiles are not binned together by the point at which the bin times 
amount
+            (as specified by the &lt;Max Bin Age&gt; property), then the 
FlowFiles will all be routed to the 'failure' relationship instead of being 
merged together.
+        </p>
+
+        <p>
+            Once a bin is merged into a single FlowFile, it can sometimes be 
useful to understand why exactly the bin was merged when it was. For example, 
if the maximum number
+            of allowable bins is reached, a merged FlowFile may consist of far 
fewer records than expected. In order to help understand the behavior, the 
Processor will emit
+            a JOIN Provenance Events when creating the merged FlowFile, and 
the JOIN event will include in it a "Details" field that explains why the bin 
was merged when it was.
+            For example, the event will indicate "Records Merged due to: Bin 
is full" if the bin reached its minimum thresholds and no more subsequent 
FlowFiles were able to be
+            added to it. Or it may indicate "Records Merged due to: Maximum 
number of bins has been exceeded" if the bin was merged due to the configured 
maximum number of bins
+            being filled and needing to free up space for a new bin.
+        </p>
+
+
+       <h3>When a Failure Occurs</h3>
+       <p>
+           When a bin is filled, the Processor is responsible for merging 
together all of the records in those FlowFiles into a single FlowFile. If the 
Processor fails
+           to do so for any reason (for example, a Record cannot be read from 
an input FlowFile), then all of the FlowFiles in that bin are routed to the 
'failure'
+           Relationship. The Processor does not skip the single problematic 
FlowFile and merge the others. This behavior was chosen because of two 
different considerations.
+           Firstly, without those problematic records, the bin may not truly 
be full, as the minimum bin size may not be reached without those records.
+           Secondly, and more importantly, if the problematic FlowFile 
contains 100 "good" records before the problematic ones, those 100 records 
would already have been
+           written to the "merged" FlowFile. We cannot un-write those records. 
If we were to then send those 100 records on and route the problematic FlowFile 
to 'failure'
+           then in a situation where the "failure" relationship is eventually 
routed back to MergeRecord, we could end up continually duplicating those 100 
successfully
+           processed records.
+       </p>
+       
+       
+       
+       <h2>Examples</h2>
+       
+       <p>
+               To better understand how this Processor works, we will lay out 
a few examples. For the sake of simplicity of these examples, we will use 
CSV-formatted data and
+               write the merged data as CSV-formatted data, but
+               the format of the data is not really relevant, as long as there 
is a Record Reader that is capable of reading the data and a Record Writer 
capable of writing
+               the data in the desired format.
+       </p>
+
+
+
+       <h3>Example 1 - Batching Together Many Small FlowFiles</h3>
+       
+       <p>
+               When we want to batch together many small FlowFiles in order to 
create one larger FlowFile, we will accomplish this by using the "Bin Packing 
Algorithm"
+               Merge Strategy. The idea here is to bundle together as many 
FlowFiles as we can within our minimum and maximum number of records and bin 
size.
+               Consider that we have the following properties set:
+       </p>
+
+<table>
+  <tr>
+    <th>Property Name</th>
+    <th>Property Value</th>
+  </tr>
+  <tr>
+    <td>Merge Strategy</td>
+    <td>Bin Packing Algorithm</td>
+  </tr>
+  <tr>
+    <td>Minimum Number of Records</td>
+    <td>3</td>
+  </tr>
+  <tr>
+    <td>Maximum Number of Records</td>
+    <td>5</td>
+  </tr>
+</table>
+
+        <p>
+            Also consider that we have the following data on the queue, with 
the schema indicating a Name and an Age field:
+        </p>
+
+<table>
+  <tr>
+    <th>FlowFile ID</th>
+    <th>FlowFile Contents</th>
+  </tr>
+  <tr>
+    <td>1</td>
+    <td>Mark, 33</td>
+  </tr>
+  <tr>
+    <td>2</td>
+    <td>John, 45<br />Jane, 43</td>
+  </tr>
+  <tr>
+    <td>3</td>
+    <td>Jake, 3</td>
+  </tr>
+  <tr>
+    <td>4</td>
+    <td>Jan, 2</td>
+  </tr>
+</table>
+
+               <p>
+                       In this, because we have not configured a Correlation 
Attribute, and because all FlowFiles have the same schema, the Processor
+                       will attempt to add all of these FlowFiles to the same 
bin. Because the Minimum Number of Records is 3 and the Maximum Number of 
Records is 5,
+                       all of the FlowFiles will be added to the same bin. The 
output, then, is a single FlowFile with the following content:
+               </p>
+
+<code>
+<pre>
+Mark, 33
+John, 45
+Jane, 43
+Jake, 3
+Jan, 2
+</pre>
+</code>
+
+               <p>
+                  When the Processor runs, it will bin all of the FlowFiles 
that it can get from the queue. After that, it will merge any bin that is "full 
enough."
+                  So if we had only 3 FlowFiles on the queue, those 3 would 
have been added, and a new bin would have been created in the next iteration, 
once the
+                  4th FlowFile showed up. However, if we had 8 FlowFiles 
queued up, only 5 would have been added to the first bin. The other 3 would 
have been added
+                  to a second bin, and that bin would then be merged since it 
reached the minimum threshold of 3 also.
+               </p>
+
+       </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index b8f9210..4cd4460 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -48,8 +50,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
@@ -826,7 +827,7 @@ public class TestMergeContent {
     @Test
     public void testUniqueAttributes() {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
-        runner.setProperty(MergeContent.ATTRIBUTE_STRATEGY, 
MergeContent.ATTRIBUTE_STRATEGY_ALL_UNIQUE);
+        runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, 
AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_UNIQUE);
         runner.setProperty(MergeContent.MAX_SIZE, "2 B");
         runner.setProperty(MergeContent.MIN_SIZE, "2 B");
 
@@ -856,7 +857,7 @@ public class TestMergeContent {
     @Test
     public void testCommonAttributesOnly() {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
-        runner.setProperty(MergeContent.ATTRIBUTE_STRATEGY, 
MergeContent.ATTRIBUTE_STRATEGY_ALL_COMMON);
+        runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, 
AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_COMMON);
         runner.setProperty(MergeContent.MAX_SIZE, "2 B");
         runner.setProperty(MergeContent.MIN_SIZE, "2 B");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
new file mode 100644
index 0000000..5ee4dd9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestMergeRecord {
+    private TestRunner runner;
+    private CommaSeparatedRecordReader readerService;
+    private MockRecordWriter writerService;
+
+    @Before
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(new MergeRecord());
+
+        readerService = new CommaSeparatedRecordReader();
+        writerService = new MockRecordWriter("header", false);
+
+        runner.addControllerService("reader", readerService);
+
+        runner.enableControllerService(readerService);
+
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(MergeRecord.RECORD_READER, "reader");
+        runner.setProperty(MergeRecord.RECORD_WRITER, "writer");
+    }
+
+    @Test
+    public void testMergeSimple() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+
+        runner.run(2);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        mff.assertAttributeEquals("record.count", "2");
+        mff.assertContentEquals("header\nJohn,35\nJane,34\n");
+    }
+
+
+    // Verify that FlowFiles are grouped with like schemas.
+    @Test
+    public void testDifferentSchema() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Color\nJane, Red");
+
+        runner.run(2, false, true);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        runner.enqueue("Name, Age\nJane, 34");
+        runner.enqueue("Name, Color\nJohn, Blue");
+
+        runner.run(2, true, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
+
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJohn,35\nJane,34\n".equals(new 
String(ff.toByteArray())))
+            .count());
+
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJane,Red\nJohn,Blue\n".equals(new 
String(ff.toByteArray())))
+            .count());
+    }
+
+    @Test
+    public void testFailureToParse() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+
+        readerService.failAfter(2);
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+        runner.enqueue("Name, Age\nJake, 3");
+
+        runner.run();
+
+        // We have to route all of the FlowFiles in the same bin to the 
'failure' relationship.
+        // Otherwise, we may have read some of the records from the failing 
FlowFile and then
+        // routed it to failure, which would result in some of its records 
moving on and others not.
+        // This, in turn, would result in the same records being added to 
potentially many FlowFiles.
+        runner.assertAllFlowFilesTransferred(MergeRecord.REL_FAILURE, 3);
+    }
+
+    @Test
+    public void testDefragment() {
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, 
MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attr1 = new HashMap<>();
+        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+
+        final Map<String, String> attr2 = new HashMap<>();
+        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+
+        final Map<String, String> attr3 = new HashMap<>();
+        attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+
+        final Map<String, String> attr4 = new HashMap<>();
+        attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+
+        runner.enqueue("Name, Age\nJohn, 35", attr1);
+        runner.enqueue("Name, Age\nJane, 34", attr2);
+
+        runner.enqueue("Name, Age\nJake, 3", attr3);
+        runner.enqueue("Name, Age\nJan, 2", attr4);
+
+        runner.run(4);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
+
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJohn,35\nJane,34\n".equals(new 
String(ff.toByteArray())))
+            .count());
+
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJake,3\nJan,2\n".equals(new 
String(ff.toByteArray())))
+            .count());
+    }
+
+
+    @Test
+    public void testMinSize() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+        runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        final StringBuilder sb = new StringBuilder("Name, Age\n");
+        for (int i = 0; i < 100; i++) {
+            sb.append("Person " + i + ", " + i + "\n");
+        }
+        runner.enqueue(sb.toString());
+
+        runner.run();
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
+    }
+
+    @Test
+    public void testMinRecords() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "103");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+        runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+
+        final StringBuilder sb = new StringBuilder("Name, Age\n");
+        for (int i = 0; i < 100; i++) {
+            sb.append("Person " + i + ", " + i + "\n");
+        }
+        runner.enqueue(sb.toString());
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
+    }
+
+    @Test
+    public void testMaxRecords() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "10");
+
+        for (int i = 0; i < 34; i++) {
+            runner.enqueue("Name, Age\nJohn, 35");
+        }
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 3);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 30);
+
+        assertEquals(4, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testMaxSize() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_SIZE, "100 B");
+
+        for (int i = 0; i < 36; i++) {
+            runner.enqueue("Name, Age\nJohnny, 5");
+        }
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 3);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 33);
+
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    @Ignore("This unit test depends on timing and could potentially cause 
problems in an automated build environment. However, it can be useful for 
manual testing")
+    public void testTimeout() throws InterruptedException {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "500");
+        runner.setProperty(MergeRecord.MAX_BIN_AGE, "500 millis");
+
+        for (int i = 0; i < 100; i++) {
+            runner.enqueue("Name, Age\nJohnny, 5");
+        }
+
+        runner.run(1, false);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        Thread.sleep(750);
+        runner.run(1, true, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 100);
+    }
+
+
+    @Test
+    public void testBinCount() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "10");
+        runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
+        runner.setProperty(MergeRecord.CORRELATION_ATTRIBUTE_NAME, 
"correlationId");
+
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < 5; i++) {
+            attrs.put("correlationId", String.valueOf(i));
+            runner.enqueue("Name, Age\nJohn, 3" + i, attrs);
+        }
+
+        runner.run(1, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+
+        attrs.put("correlationId", "5");
+        runner.enqueue("Name, Age\nJohn, 35", attrs);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+        runner.run(1, false, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+    }
+
+    @Test
+    public void testDefragmentOldestBinFailsWhenTooManyBins() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "10");
+        runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, 
MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "5");
+        for (int i = 0; i < 5; i++) {
+            attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, String.valueOf(i));
+            runner.enqueue("Name, Age\nJohn, 3" + i, attrs);
+        }
+
+        runner.run(1, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+
+        attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "5");
+        runner.enqueue("Name, Age\nJohn, 35", attrs);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+        runner.run(1, false, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+    }
+
+    @Test
+    public void testDefragmentExpiredBinFailsOnTimeout() throws 
InterruptedException {
+        runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, 
MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeRecord.MAX_BIN_AGE, "1 millis");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "5");
+        attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "0");
+        runner.enqueue("Name, Age\nJohn, 30", attrs);
+
+        runner.run(1, false);
+
+        Thread.sleep(50L);
+        runner.run(1, true, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
+    }
+}

Reply via email to