gresockj commented on a change in pull request #5523: URL: https://github.com/apache/nifi/pull/5523#discussion_r821033524
########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java ########## @@ -0,0 +1,606 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processors.standard.db.ColumnDescription; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.db.TableNotFoundException; +import org.apache.nifi.processors.standard.db.TableSchema; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; + +@Tags({"metadata", "jdbc", "database", "table", "update", "alter"}) +@CapabilityDescription("This processor uses a JDBC connection and incoming records to generate any database table changes needed to support the incoming records.") +@WritesAttributes({ + @WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' " + + "and 'failure' relationships, and contains the target table name."), + @WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' " + + "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned)."), + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer, only if a Record Writer is specified " + + "and Update Field Names is 'true'."), + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile, only if a Record Writer is specified and Update Field Names is 'true'.") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class UpdateDatabaseTable extends AbstractProcessor { + + static final AllowableValue CREATE_IF_NOT_EXISTS = new AllowableValue("Create If Not Exists", "Create If Not Exists", + "Create a table with the given schema if it does not already exist"); + static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists", + "If the target does not already exist, log an error and route the flowfile to failure"); + + static final String ATTR_OUTPUT_TABLE = "output.table"; + + // Properties + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The service for reading incoming flow files. The reader is only used to determine the schema of the records, the actual records will not be processed.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("updatedatabasetable-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection(s) to the database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() + .name("put-db-record-catalog-name") + .displayName("Catalog Name") + .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the " + + "property is set and the database is case-sensitive, the catalog name must match the database's catalog name exactly.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("put-db-record-schema-name") + .displayName("Schema Name") + .description("The name of the database schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the " + + "property is set and the database is case-sensitive, the schema name must match the database's schema name exactly.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("updatedatabasetable-table-name") + .displayName("Table Name") + .description("The name of the database table to update. If the table does not exist, then it will either be created or an error thrown, depending " + + "on the value of the Create Table property.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder() + .name("updatedatabasetable-create-table") + .displayName("Create Table Strategy") + .description("Specifies how to process the target table when it does not exist (create it, fail, e.g.).") + .required(true) + .addValidator(Validator.VALID) + .allowableValues(CREATE_IF_NOT_EXISTS, FAIL_IF_NOT_EXISTS) + .defaultValue(FAIL_IF_NOT_EXISTS.getValue()) + .build(); + + static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder() + .name("updatedatabasetable-translate-field-names") + .displayName("Translate Field Names") + .description("If true, the Processor will attempt to translate field names into the corresponding column names for the table specified, for the purposes of determining whether " + + "the field name exists as a column in the target table. NOTE: If the target table does not exist and is to be created, this property is ignored and the field names will be " + + "used as-is. If false, the field names must match the column names exactly, or the column may not be found and instead an error my be reported that the column already exists.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder() + .name("updatedatabasetable-update-field-names") + .displayName("Update Field Names") + .description("This property indicates whether to update the output schema such that the field names are set to the exact column names from the specified " + + "table. This should be used if the incoming record field names may not match the table's column names in terms of upper- and lower-case. For example, this property should be " + + "set to true if the output FlowFile is destined for Oracle e.g., which expects the field names to match the column names exactly. NOTE: The value of the " + + "'Translate Field Names' property is ignored when updating field names; instead they are updated to match the column name as returned by the database.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() Review comment: This one is still not resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
