markap14 commented on a change in pull request #5772: URL: https://github.com/apache/nifi/pull/5772#discussion_r811987920
########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java ########## @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.bin.Bin; +import org.apache.nifi.processor.util.bin.BinFiles; +import org.apache.nifi.processor.util.bin.BinManager; +import org.apache.nifi.processor.util.bin.BinProcessingResult; +import org.apache.nifi.processor.util.bin.EvictionReason; +import org.apache.nifi.processors.standard.enrichment.EnrichmentRole; +import org.apache.nifi.processors.standard.enrichment.InsertRecordFieldsJoinStrategy; +import org.apache.nifi.processors.standard.enrichment.RecordJoinInput; +import org.apache.nifi.processors.standard.enrichment.RecordJoinResult; +import org.apache.nifi.processors.standard.enrichment.RecordJoinStrategy; +import org.apache.nifi.processors.standard.enrichment.SqlJoinCache; +import org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy; +import org.apache.nifi.processors.standard.enrichment.WrapperJoinStrategy; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.db.JdbcProperties; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@TriggerWhenEmpty +@SideEffectFree +@Tags({"fork", "join", "enrichment", "record", "sql", "wrap", "recordpath", "merge", "combine", "streams"}) +@CapabilityDescription("Joins together Records from two different FlowFiles where one FlowFile, the 'original' contains arbitrary records and the second FlowFile, the 'enrichment' contains " + + "additional data that should be used to enrich the first. See Additional Details for more information on how to configure this processor and the different use cases that it aims to accomplish.") +@SeeAlso(ForkEnrichment.class) +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), + @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "This Processor will load into heap all FlowFiles that are on its incoming queues. While it loads the FlowFiles " + + "themselves, and not their content, the FlowFile attributes can be very memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL engine may require buffering the entire " + + "contents of the enrichment FlowFile for each concurrent task. See Processor's Additional Details for more details and for steps on how to mitigate these concerns.") +public class JoinEnrichment extends BinFiles { + static final String GROUP_ID_ATTRIBUTE = "enrichment.group.id"; + static final String ENRICHMENT_ROLE_ATTRIBUTE = "enrichment.role"; + static final String RECORD_COUNT_ATTRIBUTE = "record.count"; + + static final AllowableValue JOIN_WRAPPER = new AllowableValue("Wrapper", "Wrapper", "The output is a Record that contains two fields: (1) 'original', containing the Record from the original " + + "FlowFile and (2) 'enrichment' containing the corresponding Record from the enrichment FlowFile. Records will be correlated based on their index in the FlowFile. If one FlowFile has more " + + "Records than the other, a null value will be used."); + static final AllowableValue JOIN_SQL = new AllowableValue("SQL", "SQL", "The output is derived by evaluating a SQL SELECT statement that allows for two tables: 'original' and 'enrichment'. This" + + " allows for SQL JOIN statements to be used in order to correlate the Records of the two FlowFiles, so the index in which the Record is encountered in the FlowFile does not matter."); + static final AllowableValue JOIN_INSERT_ENRICHMENT_FIELDS = new AllowableValue("Insert Enrichment Fields", "Insert Enrichment Fields", + "The enrichment is joined together with the original FlowFile by placing all fields of the enrichment Record into the corresponding Record from the original FlowFile. " + + "Records will be correlated based on their index in the FlowFile."); + + static final PropertyDescriptor ORIGINAL_RECORD_READER = new PropertyDescriptor.Builder() + .name("Original Record Reader") + .displayName("Original Record Reader") + .description("The Record Reader for reading the 'original' FlowFile") + .required(true) + .identifiesControllerService(RecordReaderFactory.class) + .build(); + static final PropertyDescriptor ENRICHMENT_RECORD_READER = new PropertyDescriptor.Builder() + .name("Enrichment Record Reader") + .displayName("Enrichment Record Reader") + .description("The Record Reader for reading the 'enrichment' FlowFile") + .required(true) + .identifiesControllerService(RecordReaderFactory.class) + .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .displayName("Record Writer") + .description("The Record Writer to use for writing the results. If the Record Writer is configured to inherit the schema from the Record, the schema that it will inherit will be the result " + + "of merging both the 'original' record schema and the 'enrichment' record schema.") + .required(true) + .identifiesControllerService(RecordSetWriterFactory.class) + .build(); + + // TODO: Include in docs: What to do if we need something more complex like a script: Use Wrapper strategy to correlate the records and then use a ScriptedTransformRecord. Review comment: Yup :) Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
