markap14 commented on a change in pull request #5772:
URL: https://github.com/apache/nifi/pull/5772#discussion_r811987920



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
##########
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.bin.Bin;
+import org.apache.nifi.processor.util.bin.BinFiles;
+import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processor.util.bin.BinProcessingResult;
+import org.apache.nifi.processor.util.bin.EvictionReason;
+import org.apache.nifi.processors.standard.enrichment.EnrichmentRole;
+import 
org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinResult;
+import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinCache;
+import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy;
+import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.db.JdbcProperties;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@TriggerWhenEmpty
+@SideEffectFree
+@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", 
"merge", "combine", "streams"})
+@CapabilityDescription("Joins together Records from two different FlowFiles 
where one FlowFile, the 'original' contains arbitrary records and the second 
FlowFile, the 'enrichment' contains " +
+    "additional data that should be used to enrich the first. See Additional 
Details for more information on how to configure this processor and the 
different use cases that it aims to accomplish.")
+@SeeAlso(ForkEnrichment.class)
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"This Processor will load into heap all FlowFiles that are on its incoming 
queues. While it loads the FlowFiles " +
+    "themselves, and not their content, the FlowFile attributes can be very 
memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL 
engine may require buffering the entire " +
+    "contents of the enrichment FlowFile for each concurrent task. See 
Processor's Additional Details for more details and for steps on how to 
mitigate these concerns.")
+public class JoinEnrichment extends BinFiles {
+    static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id";
+    static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role";
+    static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+    static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", 
"Wrapper", "The output is a Record that contains two fields: (1) 'original', 
containing the Record from the original " +
+        "FlowFile and (2) 'enrichment' containing the corresponding Record 
from the enrichment FlowFile. Records will be correlated based on their index 
in the FlowFile. If one FlowFile has more " +
+        "Records than the other, a null value will be used.");
+    static final AllowableValue JOIN_SQL = new AllowableValue("SQL", "SQL", 
"The output is derived by evaluating a SQL SELECT statement that allows for two 
tables: 'original' and 'enrichment'. This" +
+        " allows for SQL JOIN statements to be used in order to correlate the 
Records of the two FlowFiles, so the index in which the Record is encountered 
in the FlowFile does not matter.");
+    static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new 
AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields",
+        "The enrichment is joined together with the original FlowFile by 
placing all fields of the enrichment Record into the corresponding Record from 
the original FlowFile. " +
+            "Records will be correlated based on their index in the 
FlowFile.");
+
+    static final PropertyDescriptor ORIGINAL_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Original Record Reader")
+        .displayName("Original Record Reader")
+        .description("The Record Reader for reading the 'original' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor ENRICHMENT_RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("Enrichment Record Reader")
+        .displayName("Enrichment Record Reader")
+        .description("The Record Reader for reading the 'enrichment' FlowFile")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("Record Writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use for writing the results. If the 
Record Writer is configured to inherit the schema from the Record, the schema 
that it will inherit will be the result " +
+            "of merging both the 'original' record schema and the 'enrichment' 
record schema.")
+        .required(true)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+    // TODO: Include in docs: What to do if we need something more complex 
like a script: Use Wrapper strategy to correlate the records and then use a 
ScriptedTransformRecord.

Review comment:
       Yup :) Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to