NIFI-853: Made updates to processors
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/28549c2b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/28549c2b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/28549c2b Branch: refs/heads/master Commit: 28549c2b1e8970ce8be68e1907830e85676c42a3 Parents: 1a37c95 Author: Mark Payne <[email protected]> Authored: Thu Aug 13 11:40:33 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Aug 19 17:36:29 2015 -0400 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 7 + .../processors/standard/ConvertJSONToSQL.java | 679 ++++++++++++++ .../apache/nifi/processors/standard/PutSQL.java | 920 +++++++++++++++++++ .../standard/TestConvertJSONToSQL.java | 431 +++++++++ .../nifi/processors/standard/TestPutSQL.java | 664 +++++++++++++ .../malformed-person-extra-comma.json | 4 + .../TestConvertJSONToSQL/person-1.json | 5 + .../person-with-extra-field.json | 6 + .../person-with-null-code.json | 5 + .../person-without-code.json | 4 + .../TestConvertJSONToSQL/person-without-id.json | 4 + .../resources/TestConvertJSONToSQL/persons.json | 21 + .../standard/ConvertFlatJSONToSQL.java | 600 ------------ .../apache/nifi/processors/standard/PutSQL.java | 203 ---- .../standard/TestConvertFlatJSONToSQL.java | 294 ------ .../nifi/processors/standard/TestPutSQL.java | 246 ----- .../malformed-person-extra-comma.json | 4 - .../TestConvertFlatJSONToSQL/person-1.json | 5 - .../person-without-code.json | 4 - .../person-without-id.json | 4 - pom.xml | 5 + 21 files changed, 2755 insertions(+), 1360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index fd267d1..729b495 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -220,6 +220,13 @@ language governing permissions and limitations under the License. --> <exclude>src/test/resources/TestMergeContent/head</exclude> <exclude>src/test/resources/TestMergeContent/user.avsc</exclude> <exclude>src/test/resources/TestMergeContent/place.avsc</exclude> + <exclude>src/test/resources/TestConvertJSONToSQL/person-1.json</exclude> + <exclude>src/test/resources/TestConvertJSONToSQL/persons.json</exclude> + <exclude>src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json</exclude> + <exclude>src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json</exclude> + <exclude>src/test/resources/TestConvertJSONToSQL/person-without-code.json</exclude> + <exclude>src/test/resources/TestConvertJSONToSQL/person-with-null-code.json</exclude> + <exclude>src/test/resources/TestConvertJSONToSQL/person-without-id.json</exclude> <exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude> <exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude> <exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude> http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java new file mode 100644 index 0000000..7eda593 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; + +@SideEffectFree +@SupportsBatching +@SeeAlso(PutSQL.class) +@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"}) +@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be " + + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to " + + "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is " + + "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' " + + "relationship and the SQL is routed to the 'sql' relationship.") +@WritesAttributes({ + @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."), + @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."), + @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. " + + "If no catalog is used, this attribute will not be added."), + @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming " + + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."), + @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be " + + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL " + + "FlowFiles were produced"), + @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters " + + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. " + + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."), + @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters " + + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding " + + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.") +}) +public class ConvertJSONToSQL extends AbstractProcessor { + private static final String UPDATE_TYPE = "UPDATE"; + private static final String INSERT_TYPE = "INSERT"; + + static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields", + "Any field in the JSON document that cannot be mapped to a column in the database is ignored"); + static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail", + "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship"); + + static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); + static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder() + .name("Statement Type") + .description("Specifies the type of SQL Statement to generate") + .required(true) + .allowableValues(UPDATE_TYPE, INSERT_TYPE) + .build(); + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the table that the statement should update") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() + .name("Catalog Name") + .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder() + .name("Translate Field Names") + .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. " + + "If false, the JSON field names must match the column names exactly, or the column will not be updated") + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder() + .name("Unmatched Field Behavior") + .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation") + .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD) + .defaultValue(IGNORE_UNMATCHED_FIELD.getValue()) + .build(); + static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder() + .name("Update Keys") + .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. " + + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. " + + "In this case, if no Primary Key exists, the conversion to SQL will fail. " + + "This property is ignored if the Statement Type is INSERT") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(true) + .build(); + + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") + .build(); + static final Relationship REL_SQL = new Relationship.Builder() + .name("sql") + .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON " + + "content or the JSON content missing a required field (if using an INSERT statement type).") + .build(); + + private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) { + return true; + } + }; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(CONNECTION_POOL); + properties.add(STATEMENT_TYPE); + properties.add(TABLE_NAME); + properties.add(CATALOG_NAME); + properties.add(TRANSLATE_FIELD_NAMES); + properties.add(UNMATCHED_FIELD_BEHAVIOR); + properties.add(UPDATE_KEY); + return properties; + } + + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_ORIGINAL); + rels.add(REL_SQL); + rels.add(REL_FAILURE); + return rels; + } + + + @OnScheduled + public void onScheduled(final ProcessContext context) { + synchronized (this) { + schemaCache.clear(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean(); + final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue()); + final String statementType = context.getProperty(STATEMENT_TYPE).getValue(); + final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final SchemaKey schemaKey = new SchemaKey(catalog, tableName); + final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; + + // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than + // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if + // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the + // Java Heap if there are a lot of different SQL statements being generated that reference different tables. + TableSchema schema; + synchronized (this) { + schema = schemaCache.get(schemaKey); + if (schema == null) { + // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. + final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); + try (final Connection conn = dbcpService.getConnection()) { + schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys); + schemaCache.put(schemaKey, schema); + } catch (final SQLException e) { + getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + } + + // Parse the JSON document + final ObjectMapper mapper = new ObjectMapper(); + final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null); + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final InputStream bufferedIn = new BufferedInputStream(in)) { + rootNodeRef.set(mapper.readTree(bufferedIn)); + } + } + }); + } catch (final ProcessException pe) { + getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final JsonNode rootNode = rootNodeRef.get(); + + // The node may or may not be a Json Array. If it isn't, we will create an + // ArrayNode and add just the root node to it. We do this so that we can easily iterate + // over the array node, rather than duplicating the logic or creating another function that takes many variables + // in order to implement the logic. + final ArrayNode arrayNode; + if (rootNode.isArray()) { + arrayNode = (ArrayNode) rootNode; + } else { + final JsonNodeFactory nodeFactory = JsonNodeFactory.instance; + arrayNode = new ArrayNode(nodeFactory); + arrayNode.add(rootNode); + } + + final String fragmentIdentifier = UUID.randomUUID().toString(); + + final Set<FlowFile> created = new HashSet<>(); + for (int i=0; i < arrayNode.size(); i++) { + final JsonNode jsonNode = arrayNode.get(i); + + final String sql; + final Map<String, String> attributes = new HashMap<>(); + + try { + if (INSERT_TYPE.equals(statementType)) { + sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields); + } else { + sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields); + } + } catch (final ProcessException pe) { + getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", + new Object[] { flowFile, statementType, pe.toString() }, pe); + session.remove(created); + session.transfer(flowFile, REL_FAILURE); + return; + } + + FlowFile sqlFlowFile = session.create(flowFile); + created.add(sqlFlowFile); + + sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(sql.getBytes(StandardCharsets.UTF_8)); + } + }); + + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + attributes.put("sql.table", tableName); + attributes.put("fragment.identifier", fragmentIdentifier); + attributes.put("fragment.count", String.valueOf(arrayNode.size())); + attributes.put("fragment.index", String.valueOf(i)); + + if (catalog != null) { + attributes.put("sql.catalog", catalog); + } + + sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes); + session.transfer(sqlFlowFile, REL_SQL); + } + + session.transfer(flowFile, REL_ORIGINAL); + } + + private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) { + final Set<String> normalizedFieldNames = new HashSet<>(); + final Iterator<String> fieldNameItr = node.getFieldNames(); + while (fieldNameItr.hasNext()) { + normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames)); + } + + return normalizedFieldNames; + } + + private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) { + + final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); + for (final String requiredColName : schema.getRequiredColumnNames()) { + final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); + if (!normalizedFieldNames.contains(normalizedColName)) { + throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'"); + } + } + + final StringBuilder sqlBuilder = new StringBuilder(); + int fieldCount = 0; + sqlBuilder.append("INSERT INTO ").append(tableName).append(" ("); + + // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as + // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // columns that we are inserting into + final Iterator<String> fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames)); + if (desc == null && !ignoreUnmappedFields) { + throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database"); + } + + if (desc != null) { + if (fieldCount++ > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append(desc.getColumnName()); + + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + final JsonNode fieldNode = rootNode.get(fieldName); + if (!fieldNode.isNull()) { + String fieldValue = fieldNode.asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + } + } + + // complete the SQL statements by adding ?'s for all of the values to be escaped. + sqlBuilder.append(") VALUES ("); + for (int i=0; i < fieldCount; i++) { + if (i > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append("?"); + } + sqlBuilder.append(")"); + + if (fieldCount == 0) { + throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table"); + } + + return sqlBuilder.toString(); + } + + private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys, + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) { + + final Set<String> updateKeyNames; + if (updateKeys == null) { + updateKeyNames = schema.getPrimaryKeyColumnNames(); + } else { + updateKeyNames = new HashSet<>(); + for (final String updateKey : updateKeys.split(",")) { + updateKeyNames.add(updateKey.trim()); + } + } + + if (updateKeyNames.isEmpty()) { + throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"); + } + + final StringBuilder sqlBuilder = new StringBuilder(); + int fieldCount = 0; + sqlBuilder.append("UPDATE ").append(tableName).append(" SET "); + + + // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON + // for each of the Update Key fields. + final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); + final Set<String> normalizedUpdateNames = new HashSet<>(); + for (final String uk : updateKeyNames) { + final String normalizedUK = normalizeColumnName(uk, translateFieldNames); + normalizedUpdateNames.add(normalizedUK); + + if (!normalizedFieldNames.contains(normalizedUK)) { + throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); + } + } + + // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as + // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // columns that we are inserting into + Iterator<String> fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); + final ColumnDescription desc = schema.getColumns().get(normalizedColName); + + if (desc == null) { + if (ignoreUnmappedFields) { + throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database"); + } else { + continue; + } + } + + // Check if this column is an Update Key. If so, skip it for now. We will come + // back to it after we finish the SET clause + if (normalizedUpdateNames.contains(normalizedColName)) { + continue; + } + + if (fieldCount++ > 0) { + sqlBuilder.append(", "); + } + + sqlBuilder.append(desc.getColumnName()).append(" = ?"); + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + + final JsonNode fieldNode = rootNode.get(fieldName); + if (!fieldNode.isNull()) { + String fieldValue = rootNode.get(fieldName).asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + } + + // Set the WHERE clause based on the Update Key values + sqlBuilder.append(" WHERE "); + + fieldNames = rootNode.getFieldNames(); + int whereFieldCount = 0; + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); + final ColumnDescription desc = schema.getColumns().get(normalizedColName); + if (desc == null) { + continue; + } + + // Check if this column is a Update Key. If so, skip it for now. We will come + // back to it after we finish the SET clause + if (!normalizedUpdateNames.contains(normalizedColName)) { + continue; + } + + if (whereFieldCount++ > 0) { + sqlBuilder.append(" AND "); + } + fieldCount++; + + sqlBuilder.append(normalizedColName).append(" = ?"); + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + String fieldValue = rootNode.get(fieldName).asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + + return sqlBuilder.toString(); + } + + private static String normalizeColumnName(final String colName, final boolean translateColumnNames) { + return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName; + } + + private static class TableSchema { + private List<String> requiredColumnNames; + private Set<String> primaryKeyColumnNames; + private Map<String, ColumnDescription> columns; + + private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames, + final Set<String> primaryKeyColumnNames) { + this.columns = new HashMap<>(); + this.primaryKeyColumnNames = primaryKeyColumnNames; + + this.requiredColumnNames = new ArrayList<>(); + for (final ColumnDescription desc : columnDescriptions) { + columns.put(ConvertJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc); + if (desc.isRequired()) { + requiredColumnNames.add(desc.columnName); + } + } + } + + public Map<String, ColumnDescription> getColumns() { + return columns; + } + + public List<String> getRequiredColumnNames() { + return requiredColumnNames; + } + + public Set<String> getPrimaryKeyColumnNames() { + return primaryKeyColumnNames; + } + + public static TableSchema from(final Connection conn, final String catalog, final String tableName, + final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { + final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); + + final List<ColumnDescription> cols = new ArrayList<>(); + while (colrs.next()) { + final ColumnDescription col = ColumnDescription.from(colrs); + cols.add(col); + } + + final Set<String> primaryKeyColumns = new HashSet<>(); + if (includePrimaryKeys) { + final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName); + + while (pkrs.next()) { + final String colName = pkrs.getString("COLUMN_NAME"); + primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); + } + } + + return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + } + } + + private static class ColumnDescription { + private final String columnName; + private final int dataType; + private final boolean required; + private final Integer columnSize; + + private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) { + this.columnName = columnName; + this.dataType = dataType; + this.required = required; + this.columnSize = columnSize; + } + + public int getDataType() { + return dataType; + } + + public Integer getColumnSize() { + return columnSize; + } + + public String getColumnName() { + return columnName; + } + + public boolean isRequired() { + return required; + } + + public static ColumnDescription from(final ResultSet resultSet) throws SQLException { + final String columnName = resultSet.getString("COLUMN_NAME"); + final int dataType = resultSet.getInt("DATA_TYPE"); + final int colSize = resultSet.getInt("COLUMN_SIZE"); + + final String nullableValue = resultSet.getString("IS_NULLABLE"); + final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty(); + final String defaultValue = resultSet.getString("COLUMN_DEF"); + final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT"); + final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue); + final boolean required = !isNullable && !isAutoIncrement && defaultValue == null; + + return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize); + } + } + + private static class SchemaKey { + private final String catalog; + private final String tableName; + + public SchemaKey(final String catalog, final String tableName) { + this.catalog = catalog; + this.tableName = tableName; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((catalog == null) ? 0 : catalog.hashCode()); + result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + final SchemaKey other = (SchemaKey) obj; + if (catalog == null) { + if (other.catalog != null) { + return false; + } + } else if (!catalog.equals(other.catalog)) { + return false; + } + + + if (tableName == null) { + if (other.tableName != null) { + return false; + } + } else if (!tableName.equals(other.tableName)) { + return false; + } + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java new file mode 100644 index 0000000..b087737 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.sql.BatchUpdateException; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLNonTransientException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +@SupportsBatching +@SeeAlso(ConvertJSONToSQL.class) +@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) +@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " + + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") +@ReadsAttributes({ + @ReadsAttribute(attribute="fragment.identifier", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or " + + "not two FlowFiles belong to the same transaction."), + @ReadsAttribute(attribute="fragment.count", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles " + + "are needed to complete the transaction."), + @ReadsAttribute(attribute="fragment.index", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles " + + "in a transaction should be evaluated."), + @ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter."), + @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.") +}) +@WritesAttributes({ + @WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " + + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") +}) +public class PutSQL extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("JDBC Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + + "The Connection Pool is necessary in order to determine the appropriate database column types.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); + static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() + .name("Support Fragmented Transactions") + .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " + + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " + + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " + + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Transaction Timeout") + .description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " + + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The preferred number of FlowFiles to put to the database in a single transaction") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder() + .name("Obtain Generated Keys") + .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " + + "This may result in slightly slower performance and is not supported by all databases.") + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + + "such as an invalid query or an integrity constraint violation") + .build(); + + private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); + private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + private static final String FRAGMENT_ID_ATTR = "fragment.identifier"; + private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; + private static final String FRAGMENT_COUNT_ATTR = "fragment.count"; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(CONNECTION_POOL); + properties.add(SUPPORT_TRANSACTIONS); + properties.add(TRANSACTION_TIMEOUT); + properties.add(BATCH_SIZE); + properties.add(OBTAIN_GENERATED_KEYS); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_RETRY); + rels.add(REL_FAILURE); + return rels; + } + + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFilePoll poll = pollFlowFiles(context, session); + if (poll == null) { + return; + } + + final List<FlowFile> flowFiles = poll.getFlowFiles(); + if (flowFiles == null) { + return; + } + + final long startNanos = System.nanoTime(); + final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean(); + final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles + final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent + final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed + final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed + + // Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when + // some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just + // map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there + // is a failure, we can route all FlowFiles to failure if we need to. + final Map<FlowFile, Relationship> destinationRelationships = new HashMap<>(); + + final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); + try (final Connection conn = dbcpService.getConnection()) { + final boolean originalAutoCommit = conn.getAutoCommit(); + try { + conn.setAutoCommit(false); + + for (final FlowFile flowFile : flowFiles) { + processedFlowFiles.add(flowFile); + final String sql = getSQL(session, flowFile); + + // Get the appropriate PreparedStatement to use. + final StatementFlowFileEnclosure enclosure; + try { + enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction()); + } catch (final SQLNonTransientException e) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); + destinationRelationships.put(flowFile, REL_FAILURE); + continue; + } + + final PreparedStatement stmt = enclosure.getStatement(); + + // set the appropriate parameters on the statement. + try { + setParameters(stmt, flowFile.getAttributes()); + } catch (final SQLException | ProcessException pe) { + getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); + destinationRelationships.put(flowFile, REL_FAILURE); + continue; + } + + // If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it. + if (obtainKeys) { + try { + // Execute the actual update. + stmt.executeUpdate(); + + // attempt to determine the key that was generated, if any. This is not supported by all + // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT), + // we will just move on without setting the attribute. + FlowFile sentFlowFile = flowFile; + final String generatedKey = determineGeneratedKey(stmt); + if (generatedKey != null) { + sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey); + } + + stmt.close(); + sentFlowFiles.add(sentFlowFile); + } catch (final SQLNonTransientException e) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); + destinationRelationships.put(flowFile, REL_FAILURE); + continue; + } catch (final SQLException e) { + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e}); + destinationRelationships.put(flowFile, REL_RETRY); + continue; + } + } else { + // We don't need to obtain keys. Just add the statement to the batch. + stmt.addBatch(); + enclosure.addFlowFile(flowFile); + enclosuresToExecute.add(enclosure); + } + } + + // If we are not trying to obtain the generated keys, we will have + // PreparedStatement's that have batches added to them. We need to execute each batch and close + // the PreparedStatement. + for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) { + try { + final PreparedStatement stmt = enclosure.getStatement(); + stmt.executeBatch(); + sentFlowFiles.addAll(enclosure.getFlowFiles()); + } catch (final BatchUpdateException e) { + // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure, + // and route that FlowFile to failure while routing those that finished processing to success and those + // that have not yet been executed to retry. If the FlowFile was + // part of a fragmented transaction, then we must roll back all updates for this connection, because + // other statements may have been successful and been part of this transaction. + final int[] updateCounts = e.getUpdateCounts(); + final int offendingFlowFileIndex = updateCounts.length; + final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles(); + + if (poll.isFragmentedTransaction()) { + // There are potentially multiple statements for this one transaction. As a result, + // we need to roll back the entire transaction and route all of the FlowFiles to failure. + conn.rollback(); + final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex); + getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. " + + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); + session.transfer(flowFiles, REL_FAILURE); + return; + } + + // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error + // occurs, or continuing. If it continues, then it must account for all statements in the batch and for + // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated. + // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED, + // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success + // unless it has not yet been processed (its index in the List > updateCounts.length). + int failureCount = 0; + int successCount = 0; + int retryCount = 0; + for (int i=0; i < updateCounts.length; i++) { + final int updateCount = updateCounts[i]; + final FlowFile flowFile = batchFlowFiles.get(i); + if (updateCount == Statement.EXECUTE_FAILED) { + destinationRelationships.put(flowFile, REL_FAILURE); + failureCount++; + } else { + destinationRelationships.put(flowFile, REL_SUCCESS); + successCount++; + } + } + + if (failureCount == 0) { + // if no failures found, the driver decided not to execute the statements after the + // failure, so route the last one to failure. + final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length); + destinationRelationships.put(failedFlowFile, REL_FAILURE); + failureCount++; + } + + if (updateCounts.length < batchFlowFiles.size()) { + final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size()); + for (final FlowFile flowFile : unexecuted) { + destinationRelationships.put(flowFile, REL_RETRY); + retryCount++; + } + } + + getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " + + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); + } catch (final SQLNonTransientException e) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e}); + + for (final FlowFile flowFile : enclosure.getFlowFiles()) { + destinationRelationships.put(flowFile, REL_FAILURE); + } + continue; + } catch (final SQLException e) { + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[] {enclosure.getFlowFiles(), e}); + + for (final FlowFile flowFile : enclosure.getFlowFiles()) { + destinationRelationships.put(flowFile, REL_RETRY); + } + continue; + } finally { + enclosure.getStatement().close(); + } + } + } finally { + try { + conn.commit(); + } finally { + // make sure that we try to set the auto commit back to whatever it was. + if (originalAutoCommit) { + try { + conn.setAutoCommit(originalAutoCommit); + } catch (final SQLException se) { + } + } + } + } + + // Determine the database URL + String url = "jdbc://unknown-host"; + try { + url = conn.getMetaData().getURL(); + } catch (final SQLException sqle) { + } + + // Emit a Provenance SEND event + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + for (final FlowFile flowFile : sentFlowFiles) { + session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true); + } + + for (final FlowFile flowFile : sentFlowFiles) { + destinationRelationships.put(flowFile, REL_SUCCESS); + } + } catch (final SQLException e) { + // Failed FlowFiles are all of them that we have processed minus those that were successfully sent + final List<FlowFile> failedFlowFiles = processedFlowFiles; + failedFlowFiles.removeAll(sentFlowFiles); + + // All FlowFiles yet to be processed is all FlowFiles minus those processed + final List<FlowFile> retry = flowFiles; + retry.removeAll(processedFlowFiles); + + final Relationship rel; + if (e instanceof SQLNonTransientException) { + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e}); + rel = REL_FAILURE; + } else { + getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e}); + rel = REL_RETRY; + } + + for (final FlowFile flowFile : failedFlowFiles) { + destinationRelationships.put(flowFile, rel); + } + + for (final FlowFile flowFile : retry) { + destinationRelationships.put(flowFile, Relationship.SELF); + } + } + + for (final Map.Entry<FlowFile, Relationship> entry : destinationRelationships.entrySet()) { + session.transfer(entry.getKey(), entry.getValue()); + } + } + + + /** + * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns <code>null</code>. + * Otherwise, a List of FlowFiles will be returned. + * + * If all FlowFiles pulled are not eligible to be processed, the FlowFiles will be penalized and transferred back + * to the input queue and an empty List will be returned. + * + * Otherwise, if the Support Fragmented Transactions property is true, all FlowFiles that belong to the same + * transaction will be sorted in the order that they should be evaluated. + * + * @param context the process context for determining properties + * @param session the process session for pulling flowfiles + * @return a FlowFilePoll containing a List of FlowFiles to process, or <code>null</code> if there are no FlowFiles to process + */ + private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) { + // Determine which FlowFile Filter to use in order to obtain FlowFiles. + final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean(); + boolean fragmentedTransaction = false; + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + List<FlowFile> flowFiles; + if (useTransactions) { + final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(); + flowFiles = session.get(filter); + fragmentedTransaction = filter.isFragmentedTransaction(); + } else { + flowFiles = session.get(batchSize); + } + + if (flowFiles.isEmpty()) { + return null; + } + + // If we are supporting fragmented transactions, verify that all FlowFiles are correct + if (fragmentedTransaction) { + final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS)); + if (relationship != null) { + // if transferring back to self, penalize the FlowFiles. + if (relationship == Relationship.SELF) { + // penalize all of the FlowFiles that we are going to route to SELF. + final ListIterator<FlowFile> itr = flowFiles.listIterator(); + while (itr.hasNext()) { + final FlowFile flowFile = itr.next(); + final FlowFile penalized = session.penalize(flowFile); + itr.remove(); + itr.add(penalized); + } + } + + session.transfer(flowFiles, relationship); + return null; + } + + // sort by fragment index. + Collections.sort(flowFiles, new Comparator<FlowFile>() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR))); + } + }); + } + + return new FlowFilePoll(flowFiles, fragmentedTransaction); + } + + + /** + * Returns the key that was generated from the given statement, or <code>null</code> if no key + * was generated or it could not be determined. + * + * @param stmt the statement that generated a key + * @return the key that was generated from the given statement, or <code>null</code> if no key + * was generated or it could not be determined. + */ + private String determineGeneratedKey(final PreparedStatement stmt) { + try { + final ResultSet generatedKeys = stmt.getGeneratedKeys(); + if (generatedKeys != null && generatedKeys.next()) { + return generatedKeys.getString(1); + } + } catch (final SQLException sqle) { + // This is not supported by all vendors. This is a best-effort approach. + } + + return null; + } + + + /** + * Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement + * + * @param sql the SQL to execute + * @param conn the connection from which a PreparedStatement can be created + * @param stmtMap the existing map of SQL to PreparedStatements + * @param obtainKeys whether or not we need to obtain generated keys for INSERT statements + * @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction + * + * @return a StatementFlowFileEnclosure to use for executing the given SQL statement + * + * @throws SQLException if unable to create the appropriate PreparedStatement + */ + private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap, + final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { + StatementFlowFileEnclosure enclosure = stmtMap.get(sql); + if (enclosure != null) { + return enclosure; + } + + if (obtainKeys) { + // Create a new Prepared Statement, requesting that it return the generated keys. + PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + + if (stmt == null) { + // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will + // in some cases (at least for DerbyDB) return null. + // We will attempt to recompile the statement without the generated keys being returned. + stmt = conn.prepareStatement(sql); + } + + // If we need to obtain keys, then we cannot do a Batch Update. In this case, + // we don't need to store the PreparedStatement in the Map because we aren't + // doing an addBatch/executeBatch. Instead, we will use the statement once + // and close it. + return new StatementFlowFileEnclosure(stmt); + } else if (fragmentedTransaction) { + // We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles. + // If we did, we could end up processing the statements out of order. It's quite possible + // that we could refactor the code some to allow for this, but as it is right now, this + // could cause problems. This is because we have a Map<String, StatementFlowFileEnclosure>. + // If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with + // some parameters, then Stmt A with different parameters, this would become problematic because + // the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then + // the second statement would be evaluated). + final PreparedStatement stmt = conn.prepareStatement(sql); + return new StatementFlowFileEnclosure(stmt); + } + + final PreparedStatement stmt = conn.prepareStatement(sql); + enclosure = new StatementFlowFileEnclosure(stmt); + stmtMap.put(sql, enclosure); + return enclosure; + } + + + /** + * Determines the SQL statement that should be executed for the given FlowFile + * + * @param session the session that can be used to access the given FlowFile + * @param flowFile the FlowFile whose SQL statement should be executed + * + * @return the SQL that is associated with the given FlowFile + */ + private String getSQL(final ProcessSession session, final FlowFile flowFile) { + // Read the SQL from the FlowFile's content + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + // Create the PreparedStatement to use for this FlowFile. + final String sql = new String(buffer, StandardCharsets.UTF_8); + return sql; + } + + + /** + * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. + * + * @param stmt the statement to set the parameters on + * @param attributes the attributes from which to derive parameter indices, values, and types + * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called + */ + private void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException { + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + final String key = entry.getKey(); + final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); + if (matcher.matches()) { + final int parameterIndex = Integer.parseInt(matcher.group(1)); + + final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); + if (!isNumeric) { + throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); + } + + final int jdbcType = Integer.parseInt(entry.getValue()); + final String valueAttrName = "sql.args." + parameterIndex + ".value"; + final String parameterValue = attributes.get(valueAttrName); + + try { + setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); + } catch (final NumberFormatException nfe) { + throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); + } + } + } + } + + + /** + * Determines which relationship the given FlowFiles should go to, based on a transaction timing out or + * transaction information not being present. If the FlowFiles should be processed and not transferred + * to any particular relationship yet, will return <code>null</code> + * + * @param flowFiles the FlowFiles whose relationship is to be determined + * @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait + * for all FlowFiles in a transaction to be present before routing to failure + * @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles + * should instead be processed + */ + Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) { + int selectedNumFragments = 0; + final BitSet bitSet = new BitSet(); + + for (final FlowFile flowFile : flowFiles) { + final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR); + if (fragmentCount == null && flowFiles.size() == 1) { + return null; + } else if (fragmentCount == null) { + getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier " + + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); + return REL_FAILURE; + } + + final int numFragments; + try { + numFragments = Integer.parseInt(fragmentCount); + } catch (final NumberFormatException nfe) { + getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + return REL_FAILURE; + } + + if (numFragments < 1) { + getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); + return REL_FAILURE; + } + + if (selectedNumFragments == 0) { + selectedNumFragments = numFragments; + } else if (numFragments != selectedNumFragments) { + getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + return REL_FAILURE; + } + + final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR); + if (fragmentIndex == null) { + getLogger().error("Cannot process {} because the fragment.index attribute is missing; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + return REL_FAILURE; + } + + final int idx; + try { + idx = Integer.parseInt(fragmentIndex); + } catch (final NumberFormatException nfe) { + getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + return REL_FAILURE; + } + + if (idx < 0) { + getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); + return REL_FAILURE; + } + + if (bitSet.get(idx)) { + getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; " + + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); + return REL_FAILURE; + } + + bitSet.set(idx); + } + + if (selectedNumFragments == flowFiles.size()) { + return null; // no relationship to route FlowFiles to yet - process the FlowFiles. + } + + long latestQueueTime = 0L; + for (final FlowFile flowFile : flowFiles) { + if (flowFile.getLastQueueDate() != null && flowFile.getLastQueueDate() > latestQueueTime) { + latestQueueTime = flowFile.getLastQueueDate(); + } + } + + if (transactionTimeoutMillis != null) { + if (latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) { + getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[] {flowFiles}); + return REL_FAILURE; + } + } + + getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue"); + return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue. + } + + /** + * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the + * provided PreparedStatement + * + * @param stmt the PreparedStatement to set the parameter on + * @param attrName the name of the attribute that the parameter is coming from - for logging purposes + * @param parameterIndex the index of the SQL parameter to set + * @param parameterValue the value of the SQL parameter to set + * @param jdbcType the JDBC Type of the SQL parameter to set + * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter + */ + private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { + if (parameterValue == null) { + stmt.setNull(parameterIndex, jdbcType); + } else { + switch (jdbcType) { + case Types.BIT: + stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); + break; + case Types.TINYINT: + stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); + break; + case Types.SMALLINT: + stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); + break; + case Types.INTEGER: + stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); + break; + case Types.BIGINT: + stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); + break; + case Types.REAL: + stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); + break; + case Types.FLOAT: + case Types.DOUBLE: + stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); + break; + case Types.DATE: + stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); + break; + case Types.TIME: + stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); + break; + case Types.TIMESTAMP: + stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + stmt.setString(parameterIndex, parameterValue); + break; + default: + throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue + + "' and a type of '" + jdbcType + "' but this is not a known data type"); + } + } + } + + + /** + * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong + * to the same "fragmented transaction" (i.e., 1 transaction whose information is fragmented + * across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction + */ + static class TransactionalFlowFileFilter implements FlowFileFilter { + private String selectedId = null; + private int numSelected = 0; + private boolean ignoreFragmentIdentifiers = false; + + public boolean isFragmentedTransaction() { + return !ignoreFragmentIdentifiers; + } + + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR); + final String fragCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR); + + // if first FlowFile selected is not part of a fragmented transaction, then + // we accept any FlowFile that is also not part of a fragmented transaction. + if (ignoreFragmentIdentifiers) { + if (fragmentId == null || "1".equals(fragCount)) { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } else { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + } + + if (fragmentId == null || "1".equals(fragCount)) { + if (selectedId == null) { + // Only one FlowFile in the transaction. + ignoreFragmentIdentifiers = true; + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } else { + // we've already selected 1 FlowFile, and this one doesn't match. + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + } + + if (selectedId == null) { + // select this fragment id as the chosen one. + selectedId = fragmentId; + numSelected++; + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + + if (selectedId.equals(fragmentId)) { + // fragment id's match. Find out if we have all of the necessary fragments or not. + final int numFragments; + if (NUMBER_PATTERN.matcher(fragCount).matches()) { + numFragments = Integer.parseInt(fragCount); + } else { + numFragments = Integer.MAX_VALUE; + } + + if (numSelected >= numFragments - 1) { + // We have all of the fragments we need for this transaction. + return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + } else { + // We still need more fragments for this transaction, so accept this one and continue. + numSelected++; + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + } else { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + } + } + + + /** + * A simple, immutable data structure to hold a List of FlowFiles and an indicator as to whether + * or not those FlowFiles represent a "fragmented transaction" - that is, a collection of FlowFiles + * that all must be executed as a single transaction (we refer to it as a fragment transaction + * because the information for that transaction, including SQL and the parameters, is fragmented + * across multiple FlowFiles). + */ + private static class FlowFilePoll { + private final List<FlowFile> flowFiles; + private final boolean fragmentedTransaction; + + public FlowFilePoll(final List<FlowFile> flowFiles, final boolean fragmentedTransaction) { + this.flowFiles = flowFiles; + this.fragmentedTransaction = fragmentedTransaction; + } + + public List<FlowFile> getFlowFiles() { + return flowFiles; + } + + public boolean isFragmentedTransaction() { + return fragmentedTransaction; + } + } + + + /** + * A simple, immutable data structure to hold a Prepared Statement and a List of FlowFiles + * for which that statement should be evaluated. + */ + private static class StatementFlowFileEnclosure { + private final PreparedStatement statement; + private final List<FlowFile> flowFiles = new ArrayList<>(); + + public StatementFlowFileEnclosure(final PreparedStatement statement) { + this.statement = statement; + } + + public PreparedStatement getStatement() { + return statement; + } + + public List<FlowFile> getFlowFiles() { + return flowFiles; + } + + public void addFlowFile(final FlowFile flowFile) { + this.flowFiles.add(flowFile); + } + + @Override + public int hashCode() { + return statement.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return false; + } + if (!(obj instanceof StatementFlowFileEnclosure)) { + return false; + } + + final StatementFlowFileEnclosure other = (StatementFlowFileEnclosure) obj; + return statement.equals(other.getStatement()); + } + } +}
