gresockj commented on a change in pull request #5772:
URL: https://github.com/apache/nifi/pull/5772#discussion_r808155362



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkEnrichment.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+@SupportsBatching
+@SideEffectFree
+@SeeAlso(JoinEnrichment.class)
+@WritesAttributes({
+    @WritesAttribute(attribute = "enrichment.group.id", description = "The 
Group ID to use in order to correlate the 'original' FlowFile with the 
'enrichment' FlowFile."),
+    @WritesAttribute(attribute = "enrichment.role", description = "The role to 
use for enrichment. This will either be ORIGINAL or ENRICHMENT.")
+})
+@CapabilityDescription("Used in conjunction with the JoinEnrichment processor, 
this processor is responsible for adding the attributes that are necessary for 
the JoinEnrichment processor to perform" +
+    " its function. Each incoming FlowFile will be cloned. The original 
FlowFile will have appropriate attributes added and then be transferred to the 
'original' relationship. The clone will have" +
+    " appropriate attributes added and then be routed to the 'enrichment' 
relationship. See the documentation for the JoinEnrichment processor (and 
especially its Additional Details) for more" +
+    " information on how these Processors work together and how to perform 
enrichment tasks in NiFi by using these Processors.")
+@Tags({"fork", "join", "enrich", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class ForkEnrichment extends AbstractProcessor {
+    static final String ENRICHMENT_ROLE = "enrichment.role";
+    static final String ENRICHMENT_GROUP_ID = "enrichment.group.id";
+
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("The incoming FlowFile will be routed to this 
relationship.")

Review comment:
       Perhaps also ", after adding appropriate attributes"

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html
##########
@@ -0,0 +1,524 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>JoinEnrichment</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+
+<h3>Introduction</h3>
+
+<p>
+    The JoinEnrichment processor is designed to be used in conjunction with 
the <a 
href="../org.apache.nifi.processors.standard.ForkEnrichment/index.html">ForkEnrichment
 Processor</a>.
+    Used together, they provide a powerful mechanism for transforming data 
into a separate request payload for gathering enrichment data, gathering that 
enrichment data, optionally transforming
+    the enrichment data, and finally joining together the original payload 
with the enrichment data.
+</p>
+
+
+
+<h3>Typical Dataflow</h3>
+
+<p>
+    A typical dataflow for accomplishing this may look something like this:
+</p>
+
+<img src="fork-join-enrichment.png" style="height: 50%; width: 50%" />
+
+<p>
+    Here, we have a ForkEnrichment processor that is responsible for taking in 
a FlowFile and producing two copies of it: one to the "original" relationship 
and the other to the "enrichment"
+    relationship. Each copy will have its own set of attributes added to it.
+</p>
+
+<p>
+    Next, we have the "original" FlowFile being routed to the JoinEnrichment 
processor, while the "enrichment" FlowFile is routed in a different direction. 
Each of these FlowFiles will have an
+    attribute named "enrichment.group.id" with the same value. The 
JoinEnrichment processor then uses this information to correlate the two 
FlowFiles. The "enrichment.role" attribute will also
+    be added to each FlowFile but with a different value. The FlowFile routed 
to "original" will have an enrichment.role of ORIGINAL while the FlowFile 
routed to "enrichment" will have an
+    enrichment.role of ENRICHMENT.
+</p>
+
+<p>
+    The Processors that make up the "enrichment" path will vary from use case 
to use case. In this example, we use
+    <a 
href="../org.apache.nifi.processors.standard.JoltTransformJSON/index.html">JoltTransformJSON</a>
 processor in order to transform our payload from the original payload into a 
payload that is
+    expected by our web service. We then use the <a 
href="../org.apache.nifi.processors.standard.InvokeHTTP/index.html">InvokeHTTP</a>
 processor in order to gather
+    enrichment data that is relevant to our use case. Other common processors 
to use in this path include
+    <a 
href="../org.apache.nifi.processors.standard.QueryRecord/index.html">QueryRecord</a>,
 <a 
href="../org.apache.nifi.processors.standard.UpdateRecord/index.html">UpdateRecord</a>,
+    <a 
href="../org.apache.nifi.processors.standard.ReplaceText/index.html">ReplaceText</a>,
 JoltTransformRecord, and ScriptedTransformRecord.
+    It is also be a common use case to transform the response from the web 
service that is invoked via InvokeHTTP using one or more of these processors.
+</p>
+
+<p>
+    After the enrichment data has been gathered, it does us little good unless 
we are able to somehow combine our enrichment data back with our original 
payload.
+    To achieve this, we use the JoinEnrichment processor. It is responsible 
for combining records from both the "original" FlowFile and the "enrichment" 
FlowFile.
+</p>
+
+<p>
+    The JoinEnrichment Processor is configured with a separate RecordReader 
for the "original" FlowFile and for the "enrichment" FlowFile. This means that 
the original data and the
+    enrichment data can have entirely different schemas and can even be in 
different data formats. For example, our original payload may be CSV data, 
while our enrichment data is a JSON
+    payload. Because we make use of RecordReaders, this is entirely okay. The 
Processor also requires a RecordWriter to use for writing out the enriched 
payload (i.e., the payload that contains
+    the join of both the "original" and the "enrichment" data).
+</p>
+
+<p>
+    The JoinEnrichment Processor offers different strategies for how to 
combine the original records with the enrichment data. Each of these is 
explained here in some detail.
+</p>
+
+<h3>Wrapper</h3>
+The Wrapper strategy is the default. Each record in the original payload is 
expected to have a corresponding record in the enrichment payload. The output 
record will be a record with two
+fields: <code>original</code> and <code>enrichment</code>. Each of these will 
contain the <i>n</i>th record from the corresponding FlowFile. For example, if 
the original FlowFile has the following
+content:
+
+<pre><code>
+id, name, age
+28021, John Doe, 55
+832, Jane Doe, 22
+29201, Jake Doe, 23
+555, Joseph Doe, 2
+</code></pre>
+
+And our enrichment FlowFile has the following content:
+
+<pre><code>
+id, email
+28021, [email protected]
+832, [email protected]
+29201, [email protected]
+</code></pre>
+
+This strategy would produce output the looks like this (assuming a JSON 
Writer):
+<pre><code>
+[
+    {
+        "original": {
+            "id": 28021,
+            "name": "John Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 28021,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 832,
+            "name": "Jane Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 832,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 29201,
+            "name": "Jake Doe",
+            "age": 23
+        },
+        "enrichment": {
+            "id": 29201,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 555,
+            "name": "Joseph Doe",
+            "age": 2
+        },
+        "enrichment": null
+    }
+]
+</code></pre>
+
+<p>
+    With this strategy, the first record of the original FlowFile is coupled 
together with the first record of the enrichment FlowFile. The second record of 
the original FlowFile is coupled
+    together with the second record of the enrichment FlowFile, and so on. If 
one of the FlowFiles has more records than the other, a <code>null</code> value 
will be used.
+</p>
+
+
+
+<h3>Insert Enrichment Fields</h3>
+
+<p>
+    The "Insert Enrichment Fields" strategy inserts all of the fields of the 
"enrichment" record into the original record. The records are correlated by 
their index in the FlowFile. That is,
+    the first record in the "enrichment" FlowFile is inserted into the first 
record in the "original" FlowFile. The second record of the "enrichment" 
FlowFile is inserted into the second
+    record of the "original" FlowFile and so on.
+</p>
+
+<p>
+    When this strategy is selected, the "Record Path" property is required. 
The Record Path is evaluated against the "original" record. Consider, for 
example, the following content for the
+    "original" FlowFile:
+</p>
+
+<pre><code>
+[{
+    "purchase": {
+        "customer": {
+            "loyaltyId": 48202,
+            "firstName": "John",
+            "lastName": "Doe"
+        },
+        "total": 48.28,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 24.14,
+                "quantity": 2
+            }
+        ]
+    }
+}, {
+    "purchase": {
+        "customer": {
+            "loyaltyId": 5512,
+            "firstName": "Jane",
+            "lastName": "Doe"
+        },
+        "total": 121.44,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 28.15,
+                "quantity": 4
+            }, {
+                "itemDescription": "inkpen",
+                "price": 4.42,
+                "quantity": 2
+            }
+        ]
+    }
+}]
+</code></pre>
+
+Joined using the following enrichment content:
+<pre><code>
+[
+    {
+        "customerDetails": {
+            "id": 48202,
+            "phone": "555-555-5555",
+            "email": "[email protected]"
+        },
+    }, {
+        "customerDetails": {
+            "id": 5512,
+            "phone": "555-555-5511",
+            "email": "[email protected]"
+        }
+    }
+]
+</code></pre>
+
+<p>
+    Let us then consider that a Record Path is used with a value of 
"/purchase/customer".
+    This would yield the following results:
+</p>
+
+<pre><code>
+[{
+    "purchase": {
+        "customer": {
+            "loyaltyId": 48202,
+            "firstName": "John",
+            "lastName": "Doe",
+            "customerDetails": {
+                "id": 48202,
+                "phone": "555-555-5555",
+                "email": "[email protected]"
+            }
+        },
+        "total": 48.28,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 24.14,
+                "quantity": 2
+            }
+        ]
+    }
+}, {
+    "purchase": {
+        "customer": {
+            "loyaltyId": 5512,
+            "firstName": "Jane",
+            "lastName": "Doe",
+            "customerDetails": {
+                "id": 5512,
+                "phone": "555-555-5511",
+                "email": "[email protected]"
+            }
+        },
+        "total": 121.44,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 28.15,
+                "quantity": 4
+            }, {
+                "itemDescription": "inkpen",
+                "price": 4.42,
+                "quantity": 2
+            }
+        ]
+    }
+}]
+</code></pre>
+
+
+
+<h3>SQL</h3>
+
+<p>
+The SQL strategy provides an important capability that differs from the 
others, in that it allows for correlating the records in the "original" 
FlowFile and the records in the "enrichment" FlowFile
+in ways other than index based. That is, the SQL-based strategy doesn't 
necessarily correlate the first record of the original FlowFile with the first 
record of the enrichment FlowFile. Instead, it
+allows the records to be correlated using standard SQL JOIN expressions.
+</p>
+
+<p>
+A common use case for this is to create a payload to query some web service. 
The response contains identifiers with additional information for enrichment, 
but the order of the records in the
+enrichment may not correspond to the order of the records in the original.
+</p>
+
+<p>
+As an example, consider the following original payload, in CSV:
+</p>
+
+<pre><code>
+id, name, age
+28021, John Doe, 55
+832, Jane Doe, 22
+29201, Jake Doe, 23
+555, Joseph Doe, 2
+</code></pre>
+
+<p>
+Additionally, consider the following payload for the enrichment data:
+</p>
+
+<pre><code>
+customer_id, customer_email, customer_name, customer_since
+555, [email protected], Joe Doe, 08/Dec/14
+832, [email protected], Mrs. Doe, 14/Nov/14
+28021, [email protected], John Doe, 22/Jan/22
+</code></pre>
+
+<p>
+When making use of the SQL strategy, we must provide a SQL SELECT statement to 
combine both our original data and our enrichment data into a single FlowFile. 
To do this, we treat our original
+FlowFile as its own table with the name "original" while we treat the 
enrichment data as its own table with the name "enrichment".
+</p>
+
+<p>
+Given this, we might combine all of the data using a simple query such as:
+</p>
+<pre><code>
+SELECT o.*, e.*
+FROM original o
+JOIN enrichment e
+ON o.id = e.customer_id
+</code></pre>
+
+<p>
+And this would provide the following output:
+</p>
+
+<pre><code>
+id, name, age, customer_id, customer_email, customer_name, customer_since
+28021, John Doe, 55, 28021, [email protected], John Doe, 22/Jan/22
+832, Jane Doe, 22, 832, [email protected], Mrs. Doe, 14/Nov/14
+555, Joseph Doe, 2, 555, [email protected], Joe Doe, 08/Dec/14
+</code></pre>
+
+<p>
+Note that in this case, the record for Jake Doe was removed because we used a 
JOIN, rather than an OUTER JOIN. We could instead use a LEFT OUTER JOIN to 
ensure that we retain all records from the
+original FlowFile and simply provide null values for any missing records in 
the enrichment:
+</p>
+
+<pre><code>
+SELECT o.*, e.*
+FROM original o
+LEFT OUTER JOIN enrichment e
+ON o.id = e.customer_id
+</code></pre>
+
+<p>
+Which would produce the following output:
+</p>
+
+<pre><code>
+id, name, age, customer_id, customer_email, customer_name, customer_since
+28021, John Doe, 55, 28021, [email protected], John Doe, 22/Jan/22
+832, Jane Doe, 22, 832, [email protected], Mrs. Doe, 14/Nov/14
+29201, Jake Doe, 23,,,,
+555, Joseph Doe, 2, 555, [email protected], Joe Doe, 08/Dec/14
+</code></pre>
+
+<p>
+But SQL is far more expressive than this, allowing us to perform far more 
powerful expressions. In this case, we probably don't want both the "id" and 
"customer_id" fields, or the "name" and
+"customer_name" fields. Let's consider, though, that the enrichment provides 
the customer's preferred name instead of their legal name. We might want to 
drop the customer_since column, as it
+doesn't make sense for our use case. We might then change our SQL to the 
following:
+</p>
+
+<pre><code>
+SELECT o.id, o.name, e.customer_name AS preferred_name, o.age, 
e.customer_email AS email
+FROM original o
+LEFT OUTER JOIN enrichment e
+ON o.id = e.customer_id
+</code></pre>
+
+<p>
+And this will produce a more convenient output:
+</p>
+
+<pre><code>
+id, name, preferred_name, age, email
+28021, John Doe, John Doe, 55, [email protected]
+832, Jane Doe, Mrs. Doe, 22, [email protected]
+29201, Jake Doe,, 23,
+555, Joseph Doe, Joe Doe, 2, [email protected]
+</code></pre>
+
+<p>
+So we can see tremendous power from the SQL strategy. However, there is a very 
important consideration that must be taken into account when using the SQL 
strategy.
+</p>
+
+<p>
+<b>WARNING:</b> while the SQL strategy provides us great power, it may require 
significant amounts of heap. Depending on the query, the SQL engine may require 
buffering the contents of the entire
+"enrichment" FlowFile in memory, in Java's heap. Additionally, if the 
Processor is scheduled with multiple concurrent tasks, each of the tasks made 
hold the entire contents of the enrichment
+FlowFile in memory. This can lead to heap exhaustion and cause stability 
problems or OutOfMemoryErrors to occur.
+</p>
+
+<p>
+There are a couple of options that will help to mitigate these concerns.
+</p>
+<ol>
+    <li>
+        Split into smaller chunks. It is generally ill-advised to split 
Record-oriented data into many tiny FlowFiles, as NiFi tends to perform best 
with larger FlowFiles. The sweet spot for NiFi
+        tends to be around 300 KB to 3 MB in size. So we do not want to break 
a large FlowFile with 100,000 records into 100,000 FlowFiles each with 1 
record. It may be advantages, though, before
+        the ForkEnrichment processor to break that FlowFile into 100 
FlowFiles, each 1,000 records; or 10 FlowFiles, each 10,000 records. This 
typically results in a smaller amount of enrichment
+        data so that we don't need to hold as much in memory.
+    </li>
+    <li>
+        Before the JoinEnrichment processor, trim the enrichment data to 
remove any fields that are not desirable. In the example above, we may have 
used QueryRecord, UpdateRecord,
+        JoltTransformRecord, or updated our schema in order to remove the 
"customer_since" field from the enrichment dataset. Because we didn't make use 
of the field, we could easily remove it
+        before the JoinEnrichment in order to reduce the size of the 
enrichment FlowFile and thereby reduce the amount of data held in memory.
+    </li>
+</ol>
+
+<p>
+It is also worth noting that the SQL strategy may result in reordering the 
records within the FlowFile, so it may be necessary to use an ORDER BY clause, 
etc. if the ordering is important.
+</p>
+
+
+
+
+<h3>Additional Memory Considerations</h3>
+
+<p>
+In addition to the warning above about using the SQL Join Strategy, there is 
another consideration to keep in mind in order to limit the amount of 
information that this Processor must keep in
+memory. While the Processor does not store the contents of all FlowFiles in 
memory, it does hold all FlowFiles' attributes in memory. As a result, the 
following points should be kept in mind when
+using this Processor.
+</p>
+
+<ol>
+    <li>
+        Avoid large attributes. FlowFile attributes should not be used to hold 
FlowFile content. Attributes are intended to be small. Generally, on the order 
of 100-200 characters. If
+        there are any large attributes, it is recommended that they be removed 
by using the
+        <a 
href="../org.apache.nifi.processors.attributes.UpdateAttributes/index.html">UpdateAttribute
 Processor</a> before the ForkEnrichment processor.
+    </li>
+    <li>
+        Avoid large numbers of attributes. While it is important to avoid 
creating large FlowFile attributes, it is just as important to avoid creating 
large numbers of attributes. Keeping 30
+        small attributes on a FlowFile is perfectly fine. Storing 300 
attributes, on the other hand, may occupy a significant amount of heap.
+    </li>
+    <li>
+        Limit backpressure. The JoinEnrichment Processor will pull into its 
own memory all of the incoming FlowFiles. As a result, it will be helpful to 
avoid providing a huge number of FlowFiles
+        to the Processor at any given time. This can be done by setting the 
backpressure limits to a smaller value. For example, in our example above, the 
ForkEnrichment Processor is connected
+        directly to the JoinEnrichment Processor. We may want to limit the 
backpressure on this connection to 500 or 1,000 instead of the default 10,000. 
Doing so will limit the number of FlowFiles
+        that are allowed to be loaded into the JoinEnrichment Processor at one 
time.
+    </li>
+</ol>
+
+
+<h3>More Complex Joining Strategies</h3>
+<p>
+This Processor offers several strategies that can be used for correlating data 
together and joining records from two different FlowFiles into a single 
FlowFile. However, there are times
+when users may require more powerful capabilities than what is offered. We 
might, for example, want to use the information in an enrichment record to 
determine whether or not to null out a value in
+the corresponding original records.
+</p>
+
+<p>
+For such uses cases, the recommended approach is to make use of the Wrapper 
strategy or the SQL strategy in order to combine the original and enrichment 
FlowFiles into a single FlowFile. Then,
+connect the "joined" relationship of this Processor to the most appropriate 
processor for further processing the data. For example, consider that we use 
the Wrapper strategy to produce output that
+looks like this:
+</p>
+
+<pre><code>
+{
+    "original": {
+        "id": 482028,
+        "name": "John Doe",
+        "ssn": "555-55-5555",
+        "phone": "555-555-5555",
+        "email": "[email protected]"
+    },
+    "enrichment": {
+        "country": "UK",
+        "allowsPII": false
+    }
+}
+</code></pre>
+
+<p>
+We might then use the TransformRecord processor with a JSON RecordReader and a 
JSON RecordSetWriter to transform this. Using Groovy, our transformation may 
look something like this:
+</p>
+
+<pre><code>
+import org.apache.nifi.serialization.record.Record
+
+Record original = (Record) record.getValue("original")
+Record enrichment = (Record) record.getValue("enrichment")
+
+if (Boolean.TRUE != enrichment?.getAsBoolean("allowsPII")) {
+    original.setValue("ssn", null)
+    original.setValue("phone", null)
+    original.setValue("email", null)
+}
+
+return original
+</code></pre>
+
+<p>
+Which will produce for us the following output:
+</p>
+
+<pre><code>
+{
+  "id" : 482028,
+  "name" : "John Doe",
+  "ssn" : null,
+  "phone" : null,
+  "email" : null
+}
+</code></pre>
+
+<p>
+In this way, we have used information from the enrichment record to optionally 
transform the original record. We then return the original record, dropping the 
enrichment record all together. In
+this way, we open up an infinite number of possibilities for transforming our 
original payload based on the content of the enrichment data that we have 
fetched based on that data.
+</p>
+
+</body>
+</html>

Review comment:
       Nicely done with these additional details

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
##########
@@ -86,7 +91,18 @@ public Record nextRecord(final boolean coerceTypes, final 
boolean dropUnknown) t
                 int i = 0;
                 for (final RecordField field : fields) {
                     final String fieldName = field.getFieldName();
-                    valueMap.put(fieldName, values[i++].trim());
+                    if (i >= values.length) {
+                        valueMap.put(fieldName, null);
+                    } else {
+                        final String trimmed = values[i].trim();
+                        if (useNullForEmptyString && trimmed.isEmpty()) {
+                            valueMap.put(fieldName, null);
+                        } else {
+                            valueMap.put(fieldName, values[i].trim());

Review comment:
       Pretty minor, but you could use `trimmed` here

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
##########
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.EvictionReason;
+import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
+import 
org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinResult;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinCache;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.db.JdbcProperties;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerWhenEmpty
+@SideEffectFree
+@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", 
"merge", "combine", "streams"})
+@CapabilityDescription("Joins together Records from two different FlowFiles 
where one FlowFile, the 'original' contains arbitrary records and the second 
FlowFile, the 'enrichment' contains " +
+    "additional data that should be used to enrich the first. See Additional 
Details for more information on how to configure this processor and the 
different use cases that it aims to accomplish.")
+@SeeAlso(ForkEnrichment.class)
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"This Processor will load into heap all FlowFiles that are on its incoming 
queues. While it loads the FlowFiles " +
+    "themselves, and not their content, the FlowFile attributes can be very 
memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL 
engine may require buffering the entire " +
+    "contents of the enrichment FlowFile for each concurrent task. See 
Processor's Additional Details for more details and for steps on how to 
mitigate these concerns.")
+public class JoinEnrichment extends BinFiles {
+    static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id";
+    static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role";
+    static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+    static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", 
"Wrapper", "The output is a Record that contains two fields: (1) 'original', 
containing the Record from the original " +
+        "FlowFile and (2) 'enrichment' containing the corresponding Record 
from the enrichment FlowFile. Records will be correlated based on their index 
in the FlowFile. If one FlowFile has more " +
+        "Records than the other, a null value will be used.");
+    static final AllowableValue JOIN_SQL = new AllowableValue("SQL", "SQL", 
"The output is derived by evaluating a SQL SELECT statement that allows for two 
tables: 'original' and 'enrichment'. This" +
+        " allows for SQL JOIN statements to be used in order to correlate the 
Records of the two FlowFiles, so the index in which the Record is encountered 
in the FlowFile does not matter.");
+    static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new 
AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields",
+        "The enrichment is joined together with the original FlowFile by 
placing all fields of the enrichment Record into the corresponding Record from 
the original FlowFile. " +
+            "Records will be correlated based on their index in the 
FlowFile.");
+
+    static final PropertyDescriptor ORIGINAL_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Original Record Reader")
+        .displayName("Original Record Reader")
+        .description("The Record Reader for reading the 'original' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor ENRICHMENT_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Enrichment Record Reader")
+        .displayName("Enrichment Record Reader")
+        .description("The Record Reader for reading the 'enrichment' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("Record Writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use for writing the results. If the 
Record Writer is configured to inherit the schema from the Record, the schema 
that it will inherit will be the result " +
+            "of merging both the 'original' record schema and the 'enrichment' 
record schema.")
+        .required(true)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+    // TODO: Include in docs: What to do if we need something more complex 
like a script: Use Wrapper strategy to correlate the records and then use a 
ScriptedTransformRecord.
+    static final PropertyDescriptor JOIN_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("Join Strategy")
+        .displayName("Join Strategy")
+        .description("Specifies how to join the two FlowFiles into a single 
FlowFile")
+        .required(true)
+        .allowableValues(JOIN_WRAPPER, JOIN_SQL, JOIN_INSERT_ENRICHMENT_FIELDS)
+        .defaultValue(JOIN_WRAPPER.getValue())
+        .build();
+    static final PropertyDescriptor SQL = new PropertyDescriptor.Builder()
+        .name("SQL")
+        .displayName("SQL")
+        .description("The SQL SELECT statement to evaluate. Expression 
Language may be provided, but doing so may result in poorer performance. 
Because this Processor is dealing with two FlowFiles " +
+            "at a time, it's also important to understand how attributes will 
be referenced. If both FlowFiles have an attribute with the same name but 
different values, the Expression Language " +
+            "will resolve to the value provided by the 'enrichment' FlowFile.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .defaultValue("SELECT original.*, enrichment.* \nFROM original \nLEFT 
OUTER JOIN enrichment \nON original.id = enrichment.id")
+        .build();
+    static final PropertyDescriptor DEFAULT_PRECISION = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_PRECISION)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor DEFAULT_SCALE = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_SCALE)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor INSERTION_RECORD_PATH = new 
PropertyDescriptor.Builder()
+        .name("Record Path")
+        .displayName("Record Path")

Review comment:
       Do you think "Record Path" will be descriptive enough, or should we call 
this "Insertion Record Path"?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html
##########
@@ -0,0 +1,524 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>JoinEnrichment</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+
+<h3>Introduction</h3>
+
+<p>
+    The JoinEnrichment processor is designed to be used in conjunction with 
the <a 
href="../org.apache.nifi.processors.standard.ForkEnrichment/index.html">ForkEnrichment
 Processor</a>.
+    Used together, they provide a powerful mechanism for transforming data 
into a separate request payload for gathering enrichment data, gathering that 
enrichment data, optionally transforming
+    the enrichment data, and finally joining together the original payload 
with the enrichment data.
+</p>
+
+
+
+<h3>Typical Dataflow</h3>
+
+<p>
+    A typical dataflow for accomplishing this may look something like this:
+</p>
+
+<img src="fork-join-enrichment.png" style="height: 50%; width: 50%" />
+
+<p>
+    Here, we have a ForkEnrichment processor that is responsible for taking in 
a FlowFile and producing two copies of it: one to the "original" relationship 
and the other to the "enrichment"
+    relationship. Each copy will have its own set of attributes added to it.
+</p>
+
+<p>
+    Next, we have the "original" FlowFile being routed to the JoinEnrichment 
processor, while the "enrichment" FlowFile is routed in a different direction. 
Each of these FlowFiles will have an
+    attribute named "enrichment.group.id" with the same value. The 
JoinEnrichment processor then uses this information to correlate the two 
FlowFiles. The "enrichment.role" attribute will also
+    be added to each FlowFile but with a different value. The FlowFile routed 
to "original" will have an enrichment.role of ORIGINAL while the FlowFile 
routed to "enrichment" will have an
+    enrichment.role of ENRICHMENT.
+</p>
+
+<p>
+    The Processors that make up the "enrichment" path will vary from use case 
to use case. In this example, we use
+    <a 
href="../org.apache.nifi.processors.standard.JoltTransformJSON/index.html">JoltTransformJSON</a>
 processor in order to transform our payload from the original payload into a 
payload that is
+    expected by our web service. We then use the <a 
href="../org.apache.nifi.processors.standard.InvokeHTTP/index.html">InvokeHTTP</a>
 processor in order to gather
+    enrichment data that is relevant to our use case. Other common processors 
to use in this path include
+    <a 
href="../org.apache.nifi.processors.standard.QueryRecord/index.html">QueryRecord</a>,
 <a 
href="../org.apache.nifi.processors.standard.UpdateRecord/index.html">UpdateRecord</a>,
+    <a 
href="../org.apache.nifi.processors.standard.ReplaceText/index.html">ReplaceText</a>,
 JoltTransformRecord, and ScriptedTransformRecord.
+    It is also be a common use case to transform the response from the web 
service that is invoked via InvokeHTTP using one or more of these processors.
+</p>
+
+<p>
+    After the enrichment data has been gathered, it does us little good unless 
we are able to somehow combine our enrichment data back with our original 
payload.
+    To achieve this, we use the JoinEnrichment processor. It is responsible 
for combining records from both the "original" FlowFile and the "enrichment" 
FlowFile.
+</p>
+
+<p>
+    The JoinEnrichment Processor is configured with a separate RecordReader 
for the "original" FlowFile and for the "enrichment" FlowFile. This means that 
the original data and the
+    enrichment data can have entirely different schemas and can even be in 
different data formats. For example, our original payload may be CSV data, 
while our enrichment data is a JSON
+    payload. Because we make use of RecordReaders, this is entirely okay. The 
Processor also requires a RecordWriter to use for writing out the enriched 
payload (i.e., the payload that contains
+    the join of both the "original" and the "enrichment" data).
+</p>
+
+<p>
+    The JoinEnrichment Processor offers different strategies for how to 
combine the original records with the enrichment data. Each of these is 
explained here in some detail.
+</p>
+
+<h3>Wrapper</h3>
+The Wrapper strategy is the default. Each record in the original payload is 
expected to have a corresponding record in the enrichment payload. The output 
record will be a record with two
+fields: <code>original</code> and <code>enrichment</code>. Each of these will 
contain the <i>n</i>th record from the corresponding FlowFile. For example, if 
the original FlowFile has the following
+content:
+
+<pre><code>
+id, name, age
+28021, John Doe, 55
+832, Jane Doe, 22
+29201, Jake Doe, 23
+555, Joseph Doe, 2
+</code></pre>
+
+And our enrichment FlowFile has the following content:
+
+<pre><code>
+id, email
+28021, [email protected]
+832, [email protected]
+29201, [email protected]
+</code></pre>
+
+This strategy would produce output the looks like this (assuming a JSON 
Writer):
+<pre><code>
+[
+    {
+        "original": {
+            "id": 28021,
+            "name": "John Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 28021,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 832,
+            "name": "Jane Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 832,
+            "email": "[email protected]"

Review comment:
       jane*

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/IndexCorrelatedJoinStrategy.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.enrichment;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class IndexCorrelatedJoinStrategy implements 
RecordJoinStrategy {
+    private final ComponentLog logger;
+
+    public IndexCorrelatedJoinStrategy(final ComponentLog logger) {
+        this.logger = logger;
+    }
+
+    protected ComponentLog getLogger() {
+        return logger;
+    }
+
+    @Override
+    public RecordJoinResult join(final RecordJoinInput originalInput, final 
RecordJoinInput enrichmentInput, final ProcessSession session, final 
RecordSchema writerSchema) throws Exception {
+        final FlowFile originalFlowFile = originalInput.getFlowFile();
+        final FlowFile enrichmentFlowFile = enrichmentInput.getFlowFile();
+
+        InputStream originalIn = null;
+        RecordReader originalRecordReader = null;
+        InputStream enrichmentIn = null;
+        RecordReader enrichmentRecordReader = null;
+
+        try {
+            originalIn = session.read(originalFlowFile);
+            originalRecordReader = 
originalInput.getRecordReaderFactory().createRecordReader(originalFlowFile, 
originalIn, logger);
+
+            enrichmentIn = session.read(enrichmentFlowFile);
+            enrichmentRecordReader = 
enrichmentInput.getRecordReaderFactory().createRecordReader(enrichmentFlowFile, 
enrichmentIn, logger);
+
+            final Record firstOriginalRecord = 
originalRecordReader.nextRecord();
+            final Record firstEnrichmentRecord = 
enrichmentRecordReader.nextRecord();
+
+            final RecordSchema resultSchema = 
createResultSchema(firstOriginalRecord, firstEnrichmentRecord);
+
+            final InputStream finalOriginalIn = originalIn;
+            final RecordReader finalOriginalRecordReader = 
originalRecordReader;
+            final InputStream finalEnrichmentIn = enrichmentIn;
+            final RecordReader finalEnrichmentRecordReader = 
enrichmentRecordReader;
+
+            final RecordSet recordSet = new RecordSet() {
+                private boolean usedFirstRecords = false;
+
+                @Override
+                public RecordSchema getSchema() {
+                    return resultSchema;
+                }
+
+                @Override
+                public Record next() throws IOException {
+                    if (!usedFirstRecords) {

Review comment:
       This is probably something obvious I'm missing, but why do we have the 
first vs. final distinction here?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
##########
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.EvictionReason;
+import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
+import 
org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinResult;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinCache;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.db.JdbcProperties;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerWhenEmpty
+@SideEffectFree
+@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", 
"merge", "combine", "streams"})
+@CapabilityDescription("Joins together Records from two different FlowFiles 
where one FlowFile, the 'original' contains arbitrary records and the second 
FlowFile, the 'enrichment' contains " +
+    "additional data that should be used to enrich the first. See Additional 
Details for more information on how to configure this processor and the 
different use cases that it aims to accomplish.")
+@SeeAlso(ForkEnrichment.class)
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"This Processor will load into heap all FlowFiles that are on its incoming 
queues. While it loads the FlowFiles " +
+    "themselves, and not their content, the FlowFile attributes can be very 
memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL 
engine may require buffering the entire " +
+    "contents of the enrichment FlowFile for each concurrent task. See 
Processor's Additional Details for more details and for steps on how to 
mitigate these concerns.")
+public class JoinEnrichment extends BinFiles {
+    static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id";
+    static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role";
+    static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+    static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", 
"Wrapper", "The output is a Record that contains two fields: (1) 'original', 
containing the Record from the original " +
+        "FlowFile and (2) 'enrichment' containing the corresponding Record 
from the enrichment FlowFile. Records will be correlated based on their index 
in the FlowFile. If one FlowFile has more " +
+        "Records than the other, a null value will be used.");
+    static final AllowableValue JOIN_SQL = new AllowableValue("SQL", "SQL", 
"The output is derived by evaluating a SQL SELECT statement that allows for two 
tables: 'original' and 'enrichment'. This" +
+        " allows for SQL JOIN statements to be used in order to correlate the 
Records of the two FlowFiles, so the index in which the Record is encountered 
in the FlowFile does not matter.");
+    static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new 
AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields",
+        "The enrichment is joined together with the original FlowFile by 
placing all fields of the enrichment Record into the corresponding Record from 
the original FlowFile. " +
+            "Records will be correlated based on their index in the 
FlowFile.");
+
+    static final PropertyDescriptor ORIGINAL_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Original Record Reader")
+        .displayName("Original Record Reader")
+        .description("The Record Reader for reading the 'original' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor ENRICHMENT_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Enrichment Record Reader")
+        .displayName("Enrichment Record Reader")
+        .description("The Record Reader for reading the 'enrichment' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("Record Writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use for writing the results. If the 
Record Writer is configured to inherit the schema from the Record, the schema 
that it will inherit will be the result " +
+            "of merging both the 'original' record schema and the 'enrichment' 
record schema.")
+        .required(true)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+    // TODO: Include in docs: What to do if we need something more complex 
like a script: Use Wrapper strategy to correlate the records and then use a 
ScriptedTransformRecord.
+    static final PropertyDescriptor JOIN_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("Join Strategy")
+        .displayName("Join Strategy")
+        .description("Specifies how to join the two FlowFiles into a single 
FlowFile")
+        .required(true)
+        .allowableValues(JOIN_WRAPPER, JOIN_SQL, JOIN_INSERT_ENRICHMENT_FIELDS)
+        .defaultValue(JOIN_WRAPPER.getValue())
+        .build();
+    static final PropertyDescriptor SQL = new PropertyDescriptor.Builder()
+        .name("SQL")
+        .displayName("SQL")
+        .description("The SQL SELECT statement to evaluate. Expression 
Language may be provided, but doing so may result in poorer performance. 
Because this Processor is dealing with two FlowFiles " +
+            "at a time, it's also important to understand how attributes will 
be referenced. If both FlowFiles have an attribute with the same name but 
different values, the Expression Language " +
+            "will resolve to the value provided by the 'enrichment' FlowFile.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .defaultValue("SELECT original.*, enrichment.* \nFROM original \nLEFT 
OUTER JOIN enrichment \nON original.id = enrichment.id")
+        .build();
+    static final PropertyDescriptor DEFAULT_PRECISION = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_PRECISION)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor DEFAULT_SCALE = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_SCALE)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor INSERTION_RECORD_PATH = new 
PropertyDescriptor.Builder()
+        .name("Record Path")
+        .displayName("Record Path")
+        .description("Specifies where in the 'original' Record the 
'enrichment' Record's fields should be inserted. Note that if the RecordPath " +
+            "does not point to any existing field in the original Record, the 
enrichment will not be inserted.")
+        .required(true)
+        .addValidator(new RecordPathValidator())
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(JOIN_STRATEGY, JOIN_INSERT_ENRICHMENT_FIELDS)
+        .build();
+
+    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+        .name("Max Bin Age")
+        .displayName("Max Bin Age")
+        .description("Once one of the FlowFiles to be merged arrives at the 
processor, specifies the maximum amount of time to wait for the other before 
timing out and routing the one that has " +

Review comment:
       "Specifies the maximum amount of time to wait for the second FlowFile 
once the first arrives at the processor, after which point the first will be 
routed to the 'timeout' relationship."

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html
##########
@@ -0,0 +1,524 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>JoinEnrichment</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+
+<h3>Introduction</h3>
+
+<p>
+    The JoinEnrichment processor is designed to be used in conjunction with 
the <a 
href="../org.apache.nifi.processors.standard.ForkEnrichment/index.html">ForkEnrichment
 Processor</a>.
+    Used together, they provide a powerful mechanism for transforming data 
into a separate request payload for gathering enrichment data, gathering that 
enrichment data, optionally transforming
+    the enrichment data, and finally joining together the original payload 
with the enrichment data.
+</p>
+
+
+
+<h3>Typical Dataflow</h3>
+
+<p>
+    A typical dataflow for accomplishing this may look something like this:
+</p>
+
+<img src="fork-join-enrichment.png" style="height: 50%; width: 50%" />
+
+<p>
+    Here, we have a ForkEnrichment processor that is responsible for taking in 
a FlowFile and producing two copies of it: one to the "original" relationship 
and the other to the "enrichment"
+    relationship. Each copy will have its own set of attributes added to it.
+</p>
+
+<p>
+    Next, we have the "original" FlowFile being routed to the JoinEnrichment 
processor, while the "enrichment" FlowFile is routed in a different direction. 
Each of these FlowFiles will have an
+    attribute named "enrichment.group.id" with the same value. The 
JoinEnrichment processor then uses this information to correlate the two 
FlowFiles. The "enrichment.role" attribute will also
+    be added to each FlowFile but with a different value. The FlowFile routed 
to "original" will have an enrichment.role of ORIGINAL while the FlowFile 
routed to "enrichment" will have an
+    enrichment.role of ENRICHMENT.
+</p>
+
+<p>
+    The Processors that make up the "enrichment" path will vary from use case 
to use case. In this example, we use
+    <a 
href="../org.apache.nifi.processors.standard.JoltTransformJSON/index.html">JoltTransformJSON</a>
 processor in order to transform our payload from the original payload into a 
payload that is
+    expected by our web service. We then use the <a 
href="../org.apache.nifi.processors.standard.InvokeHTTP/index.html">InvokeHTTP</a>
 processor in order to gather
+    enrichment data that is relevant to our use case. Other common processors 
to use in this path include
+    <a 
href="../org.apache.nifi.processors.standard.QueryRecord/index.html">QueryRecord</a>,
 <a 
href="../org.apache.nifi.processors.standard.UpdateRecord/index.html">UpdateRecord</a>,
+    <a 
href="../org.apache.nifi.processors.standard.ReplaceText/index.html">ReplaceText</a>,
 JoltTransformRecord, and ScriptedTransformRecord.
+    It is also be a common use case to transform the response from the web 
service that is invoked via InvokeHTTP using one or more of these processors.
+</p>
+
+<p>
+    After the enrichment data has been gathered, it does us little good unless 
we are able to somehow combine our enrichment data back with our original 
payload.
+    To achieve this, we use the JoinEnrichment processor. It is responsible 
for combining records from both the "original" FlowFile and the "enrichment" 
FlowFile.
+</p>
+
+<p>
+    The JoinEnrichment Processor is configured with a separate RecordReader 
for the "original" FlowFile and for the "enrichment" FlowFile. This means that 
the original data and the
+    enrichment data can have entirely different schemas and can even be in 
different data formats. For example, our original payload may be CSV data, 
while our enrichment data is a JSON
+    payload. Because we make use of RecordReaders, this is entirely okay. The 
Processor also requires a RecordWriter to use for writing out the enriched 
payload (i.e., the payload that contains
+    the join of both the "original" and the "enrichment" data).
+</p>
+
+<p>
+    The JoinEnrichment Processor offers different strategies for how to 
combine the original records with the enrichment data. Each of these is 
explained here in some detail.
+</p>
+
+<h3>Wrapper</h3>
+The Wrapper strategy is the default. Each record in the original payload is 
expected to have a corresponding record in the enrichment payload. The output 
record will be a record with two
+fields: <code>original</code> and <code>enrichment</code>. Each of these will 
contain the <i>n</i>th record from the corresponding FlowFile. For example, if 
the original FlowFile has the following
+content:
+
+<pre><code>
+id, name, age
+28021, John Doe, 55
+832, Jane Doe, 22
+29201, Jake Doe, 23
+555, Joseph Doe, 2
+</code></pre>
+
+And our enrichment FlowFile has the following content:
+
+<pre><code>
+id, email
+28021, [email protected]
+832, [email protected]
+29201, [email protected]
+</code></pre>
+
+This strategy would produce output the looks like this (assuming a JSON 
Writer):
+<pre><code>
+[
+    {
+        "original": {
+            "id": 28021,
+            "name": "John Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 28021,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 832,
+            "name": "Jane Doe",
+            "age": 55

Review comment:
       22*

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
##########
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.EvictionReason;
+import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
+import 
org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinResult;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinCache;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.db.JdbcProperties;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerWhenEmpty
+@SideEffectFree
+@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", 
"merge", "combine", "streams"})
+@CapabilityDescription("Joins together Records from two different FlowFiles 
where one FlowFile, the 'original' contains arbitrary records and the second 
FlowFile, the 'enrichment' contains " +
+    "additional data that should be used to enrich the first. See Additional 
Details for more information on how to configure this processor and the 
different use cases that it aims to accomplish.")
+@SeeAlso(ForkEnrichment.class)
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"This Processor will load into heap all FlowFiles that are on its incoming 
queues. While it loads the FlowFiles " +
+    "themselves, and not their content, the FlowFile attributes can be very 
memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL 
engine may require buffering the entire " +
+    "contents of the enrichment FlowFile for each concurrent task. See 
Processor's Additional Details for more details and for steps on how to 
mitigate these concerns.")
+public class JoinEnrichment extends BinFiles {
+    static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id";
+    static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role";
+    static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+    static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", 
"Wrapper", "The output is a Record that contains two fields: (1) 'original', 
containing the Record from the original " +
+        "FlowFile and (2) 'enrichment' containing the corresponding Record 
from the enrichment FlowFile. Records will be correlated based on their index 
in the FlowFile. If one FlowFile has more " +
+        "Records than the other, a null value will be used.");
+    static final AllowableValue JOIN_SQL = new AllowableValue("SQL", "SQL", 
"The output is derived by evaluating a SQL SELECT statement that allows for two 
tables: 'original' and 'enrichment'. This" +
+        " allows for SQL JOIN statements to be used in order to correlate the 
Records of the two FlowFiles, so the index in which the Record is encountered 
in the FlowFile does not matter.");
+    static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new 
AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields",
+        "The enrichment is joined together with the original FlowFile by 
placing all fields of the enrichment Record into the corresponding Record from 
the original FlowFile. " +
+            "Records will be correlated based on their index in the 
FlowFile.");
+
+    static final PropertyDescriptor ORIGINAL_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Original Record Reader")
+        .displayName("Original Record Reader")
+        .description("The Record Reader for reading the 'original' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor ENRICHMENT_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Enrichment Record Reader")
+        .displayName("Enrichment Record Reader")
+        .description("The Record Reader for reading the 'enrichment' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("Record Writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use for writing the results. If the 
Record Writer is configured to inherit the schema from the Record, the schema 
that it will inherit will be the result " +
+            "of merging both the 'original' record schema and the 'enrichment' 
record schema.")
+        .required(true)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+    // TODO: Include in docs: What to do if we need something more complex 
like a script: Use Wrapper strategy to correlate the records and then use a 
ScriptedTransformRecord.
+    static final PropertyDescriptor JOIN_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("Join Strategy")
+        .displayName("Join Strategy")
+        .description("Specifies how to join the two FlowFiles into a single 
FlowFile")
+        .required(true)
+        .allowableValues(JOIN_WRAPPER, JOIN_SQL, JOIN_INSERT_ENRICHMENT_FIELDS)
+        .defaultValue(JOIN_WRAPPER.getValue())
+        .build();
+    static final PropertyDescriptor SQL = new PropertyDescriptor.Builder()
+        .name("SQL")
+        .displayName("SQL")
+        .description("The SQL SELECT statement to evaluate. Expression 
Language may be provided, but doing so may result in poorer performance. 
Because this Processor is dealing with two FlowFiles " +
+            "at a time, it's also important to understand how attributes will 
be referenced. If both FlowFiles have an attribute with the same name but 
different values, the Expression Language " +
+            "will resolve to the value provided by the 'enrichment' FlowFile.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .defaultValue("SELECT original.*, enrichment.* \nFROM original \nLEFT 
OUTER JOIN enrichment \nON original.id = enrichment.id")
+        .build();
+    static final PropertyDescriptor DEFAULT_PRECISION = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_PRECISION)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor DEFAULT_SCALE = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_SCALE)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor INSERTION_RECORD_PATH = new 
PropertyDescriptor.Builder()
+        .name("Record Path")
+        .displayName("Record Path")
+        .description("Specifies where in the 'original' Record the 
'enrichment' Record's fields should be inserted. Note that if the RecordPath " +
+            "does not point to any existing field in the original Record, the 
enrichment will not be inserted.")
+        .required(true)
+        .addValidator(new RecordPathValidator())
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(JOIN_STRATEGY, JOIN_INSERT_ENRICHMENT_FIELDS)
+        .build();
+
+    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+        .name("Max Bin Age")
+        .displayName("Max Bin Age")
+        .description("Once one of the FlowFiles to be merged arrives at the 
processor, specifies the maximum amount of time to wait for the other before 
timing out and routing the one that has " +
+            "arrived to the 'timeout' relationship.")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("10 min")
+        .build();
+    static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(BinFiles.MAX_BIN_COUNT)
+        .defaultValue("10000")
+        .build();
+
+    private static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(Arrays.asList(
+        ORIGINAL_RECORD_READER,
+        ENRICHMENT_RECORD_READER,
+        RECORD_WRITER,
+        JOIN_STRATEGY,
+        SQL,
+        DEFAULT_PRECISION,
+        DEFAULT_SCALE,
+        INSERTION_RECORD_PATH,
+        MAX_BIN_COUNT,
+        TIMEOUT
+    ));
+
+    // Relationships
+    static final Relationship REL_JOINED = new Relationship.Builder()
+        .name("joined")
+        .description("The resultant FlowFile that has Records that are joined 
together from both the original FlowFile and the enrichment FlowFile will be 
routed to this relationship")

Review comment:
       "The resultant FlowFile with Records joined together from both the 
original and enrichment FlowFiles will be routed to this relationship"

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html
##########
@@ -0,0 +1,524 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>JoinEnrichment</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+
+<h3>Introduction</h3>
+
+<p>
+    The JoinEnrichment processor is designed to be used in conjunction with 
the <a 
href="../org.apache.nifi.processors.standard.ForkEnrichment/index.html">ForkEnrichment
 Processor</a>.
+    Used together, they provide a powerful mechanism for transforming data 
into a separate request payload for gathering enrichment data, gathering that 
enrichment data, optionally transforming
+    the enrichment data, and finally joining together the original payload 
with the enrichment data.
+</p>
+
+
+
+<h3>Typical Dataflow</h3>
+
+<p>
+    A typical dataflow for accomplishing this may look something like this:
+</p>
+
+<img src="fork-join-enrichment.png" style="height: 50%; width: 50%" />
+
+<p>
+    Here, we have a ForkEnrichment processor that is responsible for taking in 
a FlowFile and producing two copies of it: one to the "original" relationship 
and the other to the "enrichment"
+    relationship. Each copy will have its own set of attributes added to it.
+</p>
+
+<p>
+    Next, we have the "original" FlowFile being routed to the JoinEnrichment 
processor, while the "enrichment" FlowFile is routed in a different direction. 
Each of these FlowFiles will have an
+    attribute named "enrichment.group.id" with the same value. The 
JoinEnrichment processor then uses this information to correlate the two 
FlowFiles. The "enrichment.role" attribute will also
+    be added to each FlowFile but with a different value. The FlowFile routed 
to "original" will have an enrichment.role of ORIGINAL while the FlowFile 
routed to "enrichment" will have an
+    enrichment.role of ENRICHMENT.
+</p>
+
+<p>
+    The Processors that make up the "enrichment" path will vary from use case 
to use case. In this example, we use
+    <a 
href="../org.apache.nifi.processors.standard.JoltTransformJSON/index.html">JoltTransformJSON</a>
 processor in order to transform our payload from the original payload into a 
payload that is
+    expected by our web service. We then use the <a 
href="../org.apache.nifi.processors.standard.InvokeHTTP/index.html">InvokeHTTP</a>
 processor in order to gather
+    enrichment data that is relevant to our use case. Other common processors 
to use in this path include
+    <a 
href="../org.apache.nifi.processors.standard.QueryRecord/index.html">QueryRecord</a>,
 <a 
href="../org.apache.nifi.processors.standard.UpdateRecord/index.html">UpdateRecord</a>,
+    <a 
href="../org.apache.nifi.processors.standard.ReplaceText/index.html">ReplaceText</a>,
 JoltTransformRecord, and ScriptedTransformRecord.
+    It is also be a common use case to transform the response from the web 
service that is invoked via InvokeHTTP using one or more of these processors.
+</p>
+
+<p>
+    After the enrichment data has been gathered, it does us little good unless 
we are able to somehow combine our enrichment data back with our original 
payload.
+    To achieve this, we use the JoinEnrichment processor. It is responsible 
for combining records from both the "original" FlowFile and the "enrichment" 
FlowFile.
+</p>
+
+<p>
+    The JoinEnrichment Processor is configured with a separate RecordReader 
for the "original" FlowFile and for the "enrichment" FlowFile. This means that 
the original data and the
+    enrichment data can have entirely different schemas and can even be in 
different data formats. For example, our original payload may be CSV data, 
while our enrichment data is a JSON
+    payload. Because we make use of RecordReaders, this is entirely okay. The 
Processor also requires a RecordWriter to use for writing out the enriched 
payload (i.e., the payload that contains
+    the join of both the "original" and the "enrichment" data).
+</p>
+
+<p>
+    The JoinEnrichment Processor offers different strategies for how to 
combine the original records with the enrichment data. Each of these is 
explained here in some detail.
+</p>
+
+<h3>Wrapper</h3>
+The Wrapper strategy is the default. Each record in the original payload is 
expected to have a corresponding record in the enrichment payload. The output 
record will be a record with two
+fields: <code>original</code> and <code>enrichment</code>. Each of these will 
contain the <i>n</i>th record from the corresponding FlowFile. For example, if 
the original FlowFile has the following
+content:
+
+<pre><code>
+id, name, age
+28021, John Doe, 55
+832, Jane Doe, 22
+29201, Jake Doe, 23
+555, Joseph Doe, 2
+</code></pre>
+
+And our enrichment FlowFile has the following content:
+
+<pre><code>
+id, email
+28021, [email protected]
+832, [email protected]
+29201, [email protected]
+</code></pre>
+
+This strategy would produce output the looks like this (assuming a JSON 
Writer):
+<pre><code>
+[
+    {
+        "original": {
+            "id": 28021,
+            "name": "John Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 28021,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 832,
+            "name": "Jane Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 832,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 29201,
+            "name": "Jake Doe",
+            "age": 23
+        },
+        "enrichment": {
+            "id": 29201,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 555,
+            "name": "Joseph Doe",
+            "age": 2
+        },
+        "enrichment": null
+    }
+]
+</code></pre>
+
+<p>
+    With this strategy, the first record of the original FlowFile is coupled 
together with the first record of the enrichment FlowFile. The second record of 
the original FlowFile is coupled
+    together with the second record of the enrichment FlowFile, and so on. If 
one of the FlowFiles has more records than the other, a <code>null</code> value 
will be used.
+</p>
+
+
+
+<h3>Insert Enrichment Fields</h3>
+
+<p>
+    The "Insert Enrichment Fields" strategy inserts all of the fields of the 
"enrichment" record into the original record. The records are correlated by 
their index in the FlowFile. That is,
+    the first record in the "enrichment" FlowFile is inserted into the first 
record in the "original" FlowFile. The second record of the "enrichment" 
FlowFile is inserted into the second
+    record of the "original" FlowFile and so on.
+</p>
+
+<p>
+    When this strategy is selected, the "Record Path" property is required. 
The Record Path is evaluated against the "original" record. Consider, for 
example, the following content for the
+    "original" FlowFile:
+</p>
+
+<pre><code>
+[{
+    "purchase": {
+        "customer": {
+            "loyaltyId": 48202,
+            "firstName": "John",
+            "lastName": "Doe"
+        },
+        "total": 48.28,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 24.14,
+                "quantity": 2
+            }
+        ]
+    }
+}, {
+    "purchase": {
+        "customer": {
+            "loyaltyId": 5512,
+            "firstName": "Jane",
+            "lastName": "Doe"
+        },
+        "total": 121.44,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 28.15,
+                "quantity": 4
+            }, {
+                "itemDescription": "inkpen",
+                "price": 4.42,
+                "quantity": 2
+            }
+        ]
+    }
+}]
+</code></pre>
+
+Joined using the following enrichment content:
+<pre><code>
+[
+    {
+        "customerDetails": {
+            "id": 48202,
+            "phone": "555-555-5555",
+            "email": "[email protected]"
+        },

Review comment:
       Extra comma here

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
##########
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.EvictionReason;
+import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
+import 
org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinResult;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinCache;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.db.JdbcProperties;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerWhenEmpty
+@SideEffectFree
+@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", 
"merge", "combine", "streams"})
+@CapabilityDescription("Joins together Records from two different FlowFiles 
where one FlowFile, the 'original' contains arbitrary records and the second 
FlowFile, the 'enrichment' contains " +
+    "additional data that should be used to enrich the first. See Additional 
Details for more information on how to configure this processor and the 
different use cases that it aims to accomplish.")
+@SeeAlso(ForkEnrichment.class)
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"This Processor will load into heap all FlowFiles that are on its incoming 
queues. While it loads the FlowFiles " +
+    "themselves, and not their content, the FlowFile attributes can be very 
memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL 
engine may require buffering the entire " +
+    "contents of the enrichment FlowFile for each concurrent task. See 
Processor's Additional Details for more details and for steps on how to 
mitigate these concerns.")
+public class JoinEnrichment extends BinFiles {
+    static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id";
+    static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role";
+    static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+    static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", 
"Wrapper", "The output is a Record that contains two fields: (1) 'original', 
containing the Record from the original " +
+        "FlowFile and (2) 'enrichment' containing the corresponding Record 
from the enrichment FlowFile. Records will be correlated based on their index 
in the FlowFile. If one FlowFile has more " +
+        "Records than the other, a null value will be used.");
+    static final AllowableValue JOIN_SQL = new AllowableValue("SQL", "SQL", 
"The output is derived by evaluating a SQL SELECT statement that allows for two 
tables: 'original' and 'enrichment'. This" +
+        " allows for SQL JOIN statements to be used in order to correlate the 
Records of the two FlowFiles, so the index in which the Record is encountered 
in the FlowFile does not matter.");
+    static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new 
AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields",
+        "The enrichment is joined together with the original FlowFile by 
placing all fields of the enrichment Record into the corresponding Record from 
the original FlowFile. " +
+            "Records will be correlated based on their index in the 
FlowFile.");
+
+    static final PropertyDescriptor ORIGINAL_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Original Record Reader")
+        .displayName("Original Record Reader")
+        .description("The Record Reader for reading the 'original' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor ENRICHMENT_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Enrichment Record Reader")
+        .displayName("Enrichment Record Reader")
+        .description("The Record Reader for reading the 'enrichment' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("Record Writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use for writing the results. If the 
Record Writer is configured to inherit the schema from the Record, the schema 
that it will inherit will be the result " +
+            "of merging both the 'original' record schema and the 'enrichment' 
record schema.")
+        .required(true)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+    // TODO: Include in docs: What to do if we need something more complex 
like a script: Use Wrapper strategy to correlate the records and then use a 
ScriptedTransformRecord.
+    static final PropertyDescriptor JOIN_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("Join Strategy")
+        .displayName("Join Strategy")
+        .description("Specifies how to join the two FlowFiles into a single 
FlowFile")
+        .required(true)
+        .allowableValues(JOIN_WRAPPER, JOIN_SQL, JOIN_INSERT_ENRICHMENT_FIELDS)
+        .defaultValue(JOIN_WRAPPER.getValue())
+        .build();
+    static final PropertyDescriptor SQL = new PropertyDescriptor.Builder()
+        .name("SQL")
+        .displayName("SQL")
+        .description("The SQL SELECT statement to evaluate. Expression 
Language may be provided, but doing so may result in poorer performance. 
Because this Processor is dealing with two FlowFiles " +
+            "at a time, it's also important to understand how attributes will 
be referenced. If both FlowFiles have an attribute with the same name but 
different values, the Expression Language " +
+            "will resolve to the value provided by the 'enrichment' FlowFile.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .defaultValue("SELECT original.*, enrichment.* \nFROM original \nLEFT 
OUTER JOIN enrichment \nON original.id = enrichment.id")
+        .build();
+    static final PropertyDescriptor DEFAULT_PRECISION = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_PRECISION)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor DEFAULT_SCALE = new 
PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(JdbcProperties.DEFAULT_SCALE)
+        .required(false)
+        .dependsOn(JOIN_STRATEGY, JOIN_SQL)
+        .build();
+    static final PropertyDescriptor INSERTION_RECORD_PATH = new 
PropertyDescriptor.Builder()
+        .name("Record Path")
+        .displayName("Record Path")
+        .description("Specifies where in the 'original' Record the 
'enrichment' Record's fields should be inserted. Note that if the RecordPath " +
+            "does not point to any existing field in the original Record, the 
enrichment will not be inserted.")
+        .required(true)
+        .addValidator(new RecordPathValidator())
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(JOIN_STRATEGY, JOIN_INSERT_ENRICHMENT_FIELDS)
+        .build();
+
+    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()

Review comment:
       When I configured this for 30 seconds, the timeout did not appear to 
work.  I stopped the processor on the enrichment branch, and let the original 
flowfile wait at the JoinEnrichment processor for over two minutes.  When I 
finally released the enrichment, it correctly joined them, however, I would 
have expected the timeout to occur before then.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
##########
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.EvictionReason;
+import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
+import 
org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinResult;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinCache;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.db.JdbcProperties;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerWhenEmpty
+@SideEffectFree
+@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", 
"merge", "combine", "streams"})
+@CapabilityDescription("Joins together Records from two different FlowFiles 
where one FlowFile, the 'original' contains arbitrary records and the second 
FlowFile, the 'enrichment' contains " +
+    "additional data that should be used to enrich the first. See Additional 
Details for more information on how to configure this processor and the 
different use cases that it aims to accomplish.")
+@SeeAlso(ForkEnrichment.class)
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"This Processor will load into heap all FlowFiles that are on its incoming 
queues. While it loads the FlowFiles " +
+    "themselves, and not their content, the FlowFile attributes can be very 
memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL 
engine may require buffering the entire " +
+    "contents of the enrichment FlowFile for each concurrent task. See 
Processor's Additional Details for more details and for steps on how to 
mitigate these concerns.")
+public class JoinEnrichment extends BinFiles {
+    static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id";
+    static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role";
+    static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+    static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", 
"Wrapper", "The output is a Record that contains two fields: (1) 'original', 
containing the Record from the original " +
+        "FlowFile and (2) 'enrichment' containing the corresponding Record 
from the enrichment FlowFile. Records will be correlated based on their index 
in the FlowFile. If one FlowFile has more " +
+        "Records than the other, a null value will be used.");
+    static final AllowableValue JOIN_SQL = new AllowableValue("SQL", "SQL", 
"The output is derived by evaluating a SQL SELECT statement that allows for two 
tables: 'original' and 'enrichment'. This" +
+        " allows for SQL JOIN statements to be used in order to correlate the 
Records of the two FlowFiles, so the index in which the Record is encountered 
in the FlowFile does not matter.");
+    static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new 
AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields",
+        "The enrichment is joined together with the original FlowFile by 
placing all fields of the enrichment Record into the corresponding Record from 
the original FlowFile. " +
+            "Records will be correlated based on their index in the 
FlowFile.");
+
+    static final PropertyDescriptor ORIGINAL_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Original Record Reader")
+        .displayName("Original Record Reader")
+        .description("The Record Reader for reading the 'original' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor ENRICHMENT_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Enrichment Record Reader")
+        .displayName("Enrichment Record Reader")
+        .description("The Record Reader for reading the 'enrichment' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("Record Writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use for writing the results. If the 
Record Writer is configured to inherit the schema from the Record, the schema 
that it will inherit will be the result " +
+            "of merging both the 'original' record schema and the 'enrichment' 
record schema.")
+        .required(true)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+    // TODO: Include in docs: What to do if we need something more complex 
like a script: Use Wrapper strategy to correlate the records and then use a 
ScriptedTransformRecord.

Review comment:
       Did this make it to the docs?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html
##########
@@ -0,0 +1,524 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>JoinEnrichment</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+
+<h3>Introduction</h3>
+
+<p>
+    The JoinEnrichment processor is designed to be used in conjunction with 
the <a 
href="../org.apache.nifi.processors.standard.ForkEnrichment/index.html">ForkEnrichment
 Processor</a>.
+    Used together, they provide a powerful mechanism for transforming data 
into a separate request payload for gathering enrichment data, gathering that 
enrichment data, optionally transforming
+    the enrichment data, and finally joining together the original payload 
with the enrichment data.
+</p>
+
+
+
+<h3>Typical Dataflow</h3>
+
+<p>
+    A typical dataflow for accomplishing this may look something like this:
+</p>
+
+<img src="fork-join-enrichment.png" style="height: 50%; width: 50%" />
+
+<p>
+    Here, we have a ForkEnrichment processor that is responsible for taking in 
a FlowFile and producing two copies of it: one to the "original" relationship 
and the other to the "enrichment"
+    relationship. Each copy will have its own set of attributes added to it.
+</p>
+
+<p>
+    Next, we have the "original" FlowFile being routed to the JoinEnrichment 
processor, while the "enrichment" FlowFile is routed in a different direction. 
Each of these FlowFiles will have an
+    attribute named "enrichment.group.id" with the same value. The 
JoinEnrichment processor then uses this information to correlate the two 
FlowFiles. The "enrichment.role" attribute will also
+    be added to each FlowFile but with a different value. The FlowFile routed 
to "original" will have an enrichment.role of ORIGINAL while the FlowFile 
routed to "enrichment" will have an
+    enrichment.role of ENRICHMENT.
+</p>
+
+<p>
+    The Processors that make up the "enrichment" path will vary from use case 
to use case. In this example, we use
+    <a 
href="../org.apache.nifi.processors.standard.JoltTransformJSON/index.html">JoltTransformJSON</a>
 processor in order to transform our payload from the original payload into a 
payload that is
+    expected by our web service. We then use the <a 
href="../org.apache.nifi.processors.standard.InvokeHTTP/index.html">InvokeHTTP</a>
 processor in order to gather
+    enrichment data that is relevant to our use case. Other common processors 
to use in this path include
+    <a 
href="../org.apache.nifi.processors.standard.QueryRecord/index.html">QueryRecord</a>,
 <a 
href="../org.apache.nifi.processors.standard.UpdateRecord/index.html">UpdateRecord</a>,
+    <a 
href="../org.apache.nifi.processors.standard.ReplaceText/index.html">ReplaceText</a>,
 JoltTransformRecord, and ScriptedTransformRecord.
+    It is also be a common use case to transform the response from the web 
service that is invoked via InvokeHTTP using one or more of these processors.
+</p>
+
+<p>
+    After the enrichment data has been gathered, it does us little good unless 
we are able to somehow combine our enrichment data back with our original 
payload.
+    To achieve this, we use the JoinEnrichment processor. It is responsible 
for combining records from both the "original" FlowFile and the "enrichment" 
FlowFile.
+</p>
+
+<p>
+    The JoinEnrichment Processor is configured with a separate RecordReader 
for the "original" FlowFile and for the "enrichment" FlowFile. This means that 
the original data and the
+    enrichment data can have entirely different schemas and can even be in 
different data formats. For example, our original payload may be CSV data, 
while our enrichment data is a JSON
+    payload. Because we make use of RecordReaders, this is entirely okay. The 
Processor also requires a RecordWriter to use for writing out the enriched 
payload (i.e., the payload that contains
+    the join of both the "original" and the "enrichment" data).
+</p>
+
+<p>
+    The JoinEnrichment Processor offers different strategies for how to 
combine the original records with the enrichment data. Each of these is 
explained here in some detail.
+</p>
+
+<h3>Wrapper</h3>
+The Wrapper strategy is the default. Each record in the original payload is 
expected to have a corresponding record in the enrichment payload. The output 
record will be a record with two
+fields: <code>original</code> and <code>enrichment</code>. Each of these will 
contain the <i>n</i>th record from the corresponding FlowFile. For example, if 
the original FlowFile has the following
+content:
+
+<pre><code>
+id, name, age
+28021, John Doe, 55
+832, Jane Doe, 22
+29201, Jake Doe, 23
+555, Joseph Doe, 2
+</code></pre>
+
+And our enrichment FlowFile has the following content:
+
+<pre><code>
+id, email
+28021, [email protected]
+832, [email protected]
+29201, [email protected]
+</code></pre>
+
+This strategy would produce output the looks like this (assuming a JSON 
Writer):
+<pre><code>
+[
+    {
+        "original": {
+            "id": 28021,
+            "name": "John Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 28021,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 832,
+            "name": "Jane Doe",
+            "age": 55
+        },
+        "enrichment": {
+            "id": 832,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 29201,
+            "name": "Jake Doe",
+            "age": 23
+        },
+        "enrichment": {
+            "id": 29201,
+            "email": "[email protected]"
+        }
+    }, {
+        "original": {
+            "id": 555,
+            "name": "Joseph Doe",
+            "age": 2
+        },
+        "enrichment": null
+    }
+]
+</code></pre>
+
+<p>
+    With this strategy, the first record of the original FlowFile is coupled 
together with the first record of the enrichment FlowFile. The second record of 
the original FlowFile is coupled
+    together with the second record of the enrichment FlowFile, and so on. If 
one of the FlowFiles has more records than the other, a <code>null</code> value 
will be used.
+</p>
+
+
+
+<h3>Insert Enrichment Fields</h3>
+
+<p>
+    The "Insert Enrichment Fields" strategy inserts all of the fields of the 
"enrichment" record into the original record. The records are correlated by 
their index in the FlowFile. That is,
+    the first record in the "enrichment" FlowFile is inserted into the first 
record in the "original" FlowFile. The second record of the "enrichment" 
FlowFile is inserted into the second
+    record of the "original" FlowFile and so on.
+</p>
+
+<p>
+    When this strategy is selected, the "Record Path" property is required. 
The Record Path is evaluated against the "original" record. Consider, for 
example, the following content for the
+    "original" FlowFile:
+</p>
+
+<pre><code>
+[{
+    "purchase": {
+        "customer": {
+            "loyaltyId": 48202,
+            "firstName": "John",
+            "lastName": "Doe"
+        },
+        "total": 48.28,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 24.14,
+                "quantity": 2
+            }
+        ]
+    }
+}, {
+    "purchase": {
+        "customer": {
+            "loyaltyId": 5512,
+            "firstName": "Jane",
+            "lastName": "Doe"
+        },
+        "total": 121.44,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 28.15,
+                "quantity": 4
+            }, {
+                "itemDescription": "inkpen",
+                "price": 4.42,
+                "quantity": 2
+            }
+        ]
+    }
+}]
+</code></pre>
+
+Joined using the following enrichment content:
+<pre><code>
+[
+    {
+        "customerDetails": {
+            "id": 48202,
+            "phone": "555-555-5555",
+            "email": "[email protected]"
+        },
+    }, {
+        "customerDetails": {
+            "id": 5512,
+            "phone": "555-555-5511",
+            "email": "[email protected]"
+        }
+    }
+]
+</code></pre>
+
+<p>
+    Let us then consider that a Record Path is used with a value of 
"/purchase/customer".
+    This would yield the following results:
+</p>
+
+<pre><code>
+[{
+    "purchase": {
+        "customer": {
+            "loyaltyId": 48202,
+            "firstName": "John",
+            "lastName": "Doe",
+            "customerDetails": {
+                "id": 48202,
+                "phone": "555-555-5555",
+                "email": "[email protected]"
+            }
+        },
+        "total": 48.28,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 24.14,
+                "quantity": 2
+            }
+        ]
+    }
+}, {
+    "purchase": {
+        "customer": {
+            "loyaltyId": 5512,
+            "firstName": "Jane",
+            "lastName": "Doe",
+            "customerDetails": {
+                "id": 5512,
+                "phone": "555-555-5511",
+                "email": "[email protected]"
+            }
+        },
+        "total": 121.44,
+        "items": [
+            {
+                "itemDescription": "book",
+                "price": 28.15,
+                "quantity": 4
+            }, {
+                "itemDescription": "inkpen",
+                "price": 4.42,
+                "quantity": 2
+            }
+        ]
+    }
+}]
+</code></pre>
+
+
+
+<h3>SQL</h3>
+
+<p>
+The SQL strategy provides an important capability that differs from the 
others, in that it allows for correlating the records in the "original" 
FlowFile and the records in the "enrichment" FlowFile
+in ways other than index based. That is, the SQL-based strategy doesn't 
necessarily correlate the first record of the original FlowFile with the first 
record of the enrichment FlowFile. Instead, it
+allows the records to be correlated using standard SQL JOIN expressions.
+</p>
+
+<p>
+A common use case for this is to create a payload to query some web service. 
The response contains identifiers with additional information for enrichment, 
but the order of the records in the
+enrichment may not correspond to the order of the records in the original.
+</p>
+
+<p>
+As an example, consider the following original payload, in CSV:
+</p>
+
+<pre><code>
+id, name, age
+28021, John Doe, 55
+832, Jane Doe, 22
+29201, Jake Doe, 23
+555, Joseph Doe, 2
+</code></pre>
+
+<p>
+Additionally, consider the following payload for the enrichment data:
+</p>
+
+<pre><code>
+customer_id, customer_email, customer_name, customer_since
+555, [email protected], Joe Doe, 08/Dec/14
+832, [email protected], Mrs. Doe, 14/Nov/14
+28021, [email protected], John Doe, 22/Jan/22
+</code></pre>
+
+<p>
+When making use of the SQL strategy, we must provide a SQL SELECT statement to 
combine both our original data and our enrichment data into a single FlowFile. 
To do this, we treat our original
+FlowFile as its own table with the name "original" while we treat the 
enrichment data as its own table with the name "enrichment".
+</p>
+
+<p>
+Given this, we might combine all of the data using a simple query such as:
+</p>
+<pre><code>
+SELECT o.*, e.*
+FROM original o
+JOIN enrichment e
+ON o.id = e.customer_id
+</code></pre>
+
+<p>
+And this would provide the following output:
+</p>
+
+<pre><code>
+id, name, age, customer_id, customer_email, customer_name, customer_since
+28021, John Doe, 55, 28021, [email protected], John Doe, 22/Jan/22
+832, Jane Doe, 22, 832, [email protected], Mrs. Doe, 14/Nov/14
+555, Joseph Doe, 2, 555, [email protected], Joe Doe, 08/Dec/14
+</code></pre>
+
+<p>
+Note that in this case, the record for Jake Doe was removed because we used a 
JOIN, rather than an OUTER JOIN. We could instead use a LEFT OUTER JOIN to 
ensure that we retain all records from the
+original FlowFile and simply provide null values for any missing records in 
the enrichment:
+</p>
+
+<pre><code>
+SELECT o.*, e.*
+FROM original o
+LEFT OUTER JOIN enrichment e
+ON o.id = e.customer_id
+</code></pre>
+
+<p>
+Which would produce the following output:
+</p>
+
+<pre><code>
+id, name, age, customer_id, customer_email, customer_name, customer_since
+28021, John Doe, 55, 28021, [email protected], John Doe, 22/Jan/22
+832, Jane Doe, 22, 832, [email protected], Mrs. Doe, 14/Nov/14
+29201, Jake Doe, 23,,,,
+555, Joseph Doe, 2, 555, [email protected], Joe Doe, 08/Dec/14
+</code></pre>
+
+<p>
+But SQL is far more expressive than this, allowing us to perform far more 
powerful expressions. In this case, we probably don't want both the "id" and 
"customer_id" fields, or the "name" and
+"customer_name" fields. Let's consider, though, that the enrichment provides 
the customer's preferred name instead of their legal name. We might want to 
drop the customer_since column, as it
+doesn't make sense for our use case. We might then change our SQL to the 
following:
+</p>
+
+<pre><code>
+SELECT o.id, o.name, e.customer_name AS preferred_name, o.age, 
e.customer_email AS email
+FROM original o
+LEFT OUTER JOIN enrichment e
+ON o.id = e.customer_id
+</code></pre>
+
+<p>
+And this will produce a more convenient output:
+</p>
+
+<pre><code>
+id, name, preferred_name, age, email
+28021, John Doe, John Doe, 55, [email protected]
+832, Jane Doe, Mrs. Doe, 22, [email protected]
+29201, Jake Doe,, 23,
+555, Joseph Doe, Joe Doe, 2, [email protected]
+</code></pre>
+
+<p>
+So we can see tremendous power from the SQL strategy. However, there is a very 
important consideration that must be taken into account when using the SQL 
strategy.
+</p>
+
+<p>
+<b>WARNING:</b> while the SQL strategy provides us great power, it may require 
significant amounts of heap. Depending on the query, the SQL engine may require 
buffering the contents of the entire
+"enrichment" FlowFile in memory, in Java's heap. Additionally, if the 
Processor is scheduled with multiple concurrent tasks, each of the tasks made 
hold the entire contents of the enrichment
+FlowFile in memory. This can lead to heap exhaustion and cause stability 
problems or OutOfMemoryErrors to occur.
+</p>
+
+<p>
+There are a couple of options that will help to mitigate these concerns.
+</p>
+<ol>
+    <li>
+        Split into smaller chunks. It is generally ill-advised to split 
Record-oriented data into many tiny FlowFiles, as NiFi tends to perform best 
with larger FlowFiles. The sweet spot for NiFi
+        tends to be around 300 KB to 3 MB in size. So we do not want to break 
a large FlowFile with 100,000 records into 100,000 FlowFiles each with 1 
record. It may be advantages, though, before

Review comment:
       advantageous*




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to