NIFI-853: Made updates to processors

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/28549c2b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/28549c2b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/28549c2b

Branch: refs/heads/master
Commit: 28549c2b1e8970ce8be68e1907830e85676c42a3
Parents: 1a37c95
Author: Mark Payne <[email protected]>
Authored: Thu Aug 13 11:40:33 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Aug 19 17:36:29 2015 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   7 +
 .../processors/standard/ConvertJSONToSQL.java   | 679 ++++++++++++++
 .../apache/nifi/processors/standard/PutSQL.java | 920 +++++++++++++++++++
 .../standard/TestConvertJSONToSQL.java          | 431 +++++++++
 .../nifi/processors/standard/TestPutSQL.java    | 664 +++++++++++++
 .../malformed-person-extra-comma.json           |   4 +
 .../TestConvertJSONToSQL/person-1.json          |   5 +
 .../person-with-extra-field.json                |   6 +
 .../person-with-null-code.json                  |   5 +
 .../person-without-code.json                    |   4 +
 .../TestConvertJSONToSQL/person-without-id.json |   4 +
 .../resources/TestConvertJSONToSQL/persons.json |  21 +
 .../standard/ConvertFlatJSONToSQL.java          | 600 ------------
 .../apache/nifi/processors/standard/PutSQL.java | 203 ----
 .../standard/TestConvertFlatJSONToSQL.java      | 294 ------
 .../nifi/processors/standard/TestPutSQL.java    | 246 -----
 .../malformed-person-extra-comma.json           |   4 -
 .../TestConvertFlatJSONToSQL/person-1.json      |   5 -
 .../person-without-code.json                    |   4 -
 .../person-without-id.json                      |   4 -
 pom.xml                                         |   5 +
 21 files changed, 2755 insertions(+), 1360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index fd267d1..729b495 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -220,6 +220,13 @@ language governing permissions and limitations under the 
License. -->
                         
<exclude>src/test/resources/TestMergeContent/head</exclude>
                         
<exclude>src/test/resources/TestMergeContent/user.avsc</exclude>
                         
<exclude>src/test/resources/TestMergeContent/place.avsc</exclude>
+                        
<exclude>src/test/resources/TestConvertJSONToSQL/person-1.json</exclude>
+                        
<exclude>src/test/resources/TestConvertJSONToSQL/persons.json</exclude>
+                        
<exclude>src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json</exclude>
+                        
<exclude>src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json</exclude>
+                        
<exclude>src/test/resources/TestConvertJSONToSQL/person-without-code.json</exclude>
+                        
<exclude>src/test/resources/TestConvertJSONToSQL/person-with-null-code.json</exclude>
+                        
<exclude>src/test/resources/TestConvertJSONToSQL/person-without-id.json</exclude>
                         
<exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude>
                         
<exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude>
                         
<exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
new file mode 100644
index 0000000..7eda593
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+@SideEffectFree
+@SupportsBatching
+@SeeAlso(PutSQL.class)
+@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", 
"flat"})
+@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or 
INSERT SQL statement. The incoming FlowFile is expected to be "
+        + "\"flat\" JSON message, meaning that it consists of a single JSON 
element and each field maps to a simple type. If a field maps to "
+        + "a JSON object, that JSON object will be interpreted as Text. If the 
input is an array of JSON elements, each element in the array is "
+        + "output as a separate FlowFile to the 'sql' relationship. Upon 
successful conversion, the original FlowFile is routed to the 'original' "
+        + "relationship and the SQL is routed to the 'sql' relationship.")
+@WritesAttributes({
+    @WritesAttribute(attribute="mime.type", description="Sets mime.type of 
FlowFile that is routed to 'sql' to 'text/plain'."),
+    @WritesAttribute(attribute="sql.table", description="Sets the sql.table 
attribute of FlowFile that is routed to 'sql' to the name of the table that is 
updated by the SQL statement."),
+    @WritesAttribute(attribute="sql.catalog", description="If the Catalog name 
is set for this database, specifies the name of the catalog that the SQL 
statement will update. "
+            + "If no catalog is used, this attribute will not be added."),
+    @WritesAttribute(attribute="fragment.identifier", description="All 
FlowFiles routed to the 'sql' relationship for the same incoming FlowFile 
(multiple will be output for the same incoming "
+            + "FlowFile if the incoming FlowFile is a JSON Array) will have 
the same value for the fragment.identifier attribute. This can then be used to 
correlate the results."),
+    @WritesAttribute(attribute="fragment.count", description="The number of 
SQL FlowFiles that were produced for same incoming FlowFile. This can be used 
in conjunction with the "
+            + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming FlowFile."),
+    @WritesAttribute(attribute="fragment.index", description="The position of 
this FlowFile in the list of outgoing FlowFiles that were all derived from the 
same incoming FlowFile. This can be "
+            + "used in conjunction with the fragment.identifier and 
fragment.count attributes to know which FlowFiles originated from the same 
incoming FlowFile and in what order the SQL "
+            + "FlowFiles were produced"),
+    @WritesAttribute(attribute="sql.args.N.type", description="The output SQL 
statements are parameterized in order to avoid SQL Injection Attacks. The types 
of the Parameters "
+            + "to use are stored in attributes named sql.args.1.type, 
sql.args.2.type, sql.args.3.type, and so on. The type is a number representing 
a JDBC Type constant. "
+            + "Generally, this is useful only for software to read and 
interpret but is added so that a processor such as PutSQL can understand how to 
interpret the values."),
+    @WritesAttribute(attribute="sql.args.N.value", description="The output SQL 
statements are parameterized in order to avoid SQL Injection Attacks. The 
values of the Parameters "
+            + "to use are stored in the attributes named sql.args.1.value, 
sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a 
corresponding "
+            + "sql.args.N.type attribute that indicates how the value should 
be interpreted when inserting it into the database.")
+})
+public class ConvertJSONToSQL extends AbstractProcessor {
+    private static final String UPDATE_TYPE = "UPDATE";
+    private static final String INSERT_TYPE = "INSERT";
+
+    static final AllowableValue IGNORE_UNMATCHED_FIELD = new 
AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
+        "Any field in the JSON document that cannot be mapped to a column in 
the database is ignored");
+    static final AllowableValue FAIL_UNMATCHED_FIELD = new 
AllowableValue("Fail", "Fail",
+        "If the JSON document has any field that cannot be mapped to a column 
in the database, the FlowFile will be routed to the failure relationship");
+
+    static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
+            .name("JDBC Connection Pool")
+            .description("Specifies the JDBC Connection Pool to use in order 
to convert the JSON message to a SQL statement. "
+                    + "The Connection Pool is necessary in order to determine 
the appropriate database column types.")
+            .identifiesControllerService(DBCPService.class)
+            .required(true)
+            .build();
+    static final PropertyDescriptor STATEMENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Statement Type")
+            .description("Specifies the type of SQL Statement to generate")
+            .required(true)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .build();
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the table that the statement should 
update")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor CATALOG_NAME = new 
PropertyDescriptor.Builder()
+            .name("Catalog Name")
+            .description("The name of the catalog that the statement should 
update. This may not apply for the database that you are updating. In this 
case, leave the field empty")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
+            .name("Translate Field Names")
+            .description("If true, the Processor will attempt to translate 
JSON field names into the appropriate column names for the table specified. "
+                    + "If false, the JSON field names must match the column 
names exactly, or the column will not be updated")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+    static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new 
PropertyDescriptor.Builder()
+        .name("Unmatched Field Behavior")
+        .description("If an incoming JSON element has a field that does not 
map to any of the database table's columns, this property specifies how to 
handle the situation")
+        .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
+        .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
+        .build();
+    static final PropertyDescriptor UPDATE_KEY = new 
PropertyDescriptor.Builder()
+            .name("Update Keys")
+            .description("A comma-separated list of column names that uniquely 
identifies a row in the database for UPDATE statements. "
+                    + "If the Statement Type is UPDATE and this property is 
not set, the table's Primary Keys are used. "
+                    + "In this case, if no Primary Key exists, the conversion 
to SQL will fail. "
+                    + "This property is ignored if the Statement Type is 
INSERT")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(true)
+            .build();
+
+
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("When a FlowFile is converted to SQL, the original 
JSON FlowFile is routed to this relationship")
+            .build();
+    static final Relationship REL_SQL = new Relationship.Builder()
+            .name("sql")
+            .description("A FlowFile is routed to this relationship when its 
contents have successfully been converted into a SQL statement")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it 
cannot be converted into a SQL statement. Common causes include invalid JSON "
+                    + "content or the JSON content missing a required field 
(if using an INSERT statement type).")
+            .build();
+
+    private final Map<SchemaKey, TableSchema> schemaCache = new 
LinkedHashMap<SchemaKey, TableSchema>(100) {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> 
eldest) {
+            return true;
+        }
+    };
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CONNECTION_POOL);
+        properties.add(STATEMENT_TYPE);
+        properties.add(TABLE_NAME);
+        properties.add(CATALOG_NAME);
+        properties.add(TRANSLATE_FIELD_NAMES);
+        properties.add(UNMATCHED_FIELD_BEHAVIOR);
+        properties.add(UPDATE_KEY);
+        return properties;
+    }
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_SQL);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        synchronized (this) {
+            schemaCache.clear();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final boolean translateFieldNames = 
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+        final boolean ignoreUnmappedFields = 
IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
+        final String statementType = 
context.getProperty(STATEMENT_TYPE).getValue();
+        final String updateKeys = 
context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        final String catalog = 
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
+        final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) 
&& updateKeys == null;
+
+        // get the database schema from the cache, if one exists. We do this 
in a synchronized block, rather than
+        // using a ConcurrentMap because the Map that we are using is a 
LinkedHashMap with a capacity such that if
+        // the Map grows beyond this capacity, old elements are evicted. We do 
this in order to avoid filling the
+        // Java Heap if there are a lot of different SQL statements being 
generated that reference different tables.
+        TableSchema schema;
+        synchronized (this) {
+            schema = schemaCache.get(schemaKey);
+            if (schema == null) {
+                // No schema exists for this table yet. Query the database to 
determine the schema and put it into the cache.
+                final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+                try (final Connection conn = dbcpService.getConnection()) {
+                    schema = TableSchema.from(conn, catalog, tableName, 
translateFieldNames, includePrimaryKeys);
+                    schemaCache.put(schemaKey, schema);
+                } catch (final SQLException e) {
+                    getLogger().error("Failed to convert {} into a SQL 
statement due to {}; routing to failure", new Object[] {flowFile, 
e.toString()}, e);
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
+            }
+        }
+
+        // Parse the JSON document
+        final ObjectMapper mapper = new ObjectMapper();
+        final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
+        try {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (final InputStream bufferedIn = new 
BufferedInputStream(in)) {
+                        rootNodeRef.set(mapper.readTree(bufferedIn));
+                    }
+                }
+            });
+        } catch (final ProcessException pe) {
+            getLogger().error("Failed to parse {} as JSON due to {}; routing 
to failure", new Object[] {flowFile, pe.toString()}, pe);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final JsonNode rootNode = rootNodeRef.get();
+
+        // The node may or may not be a Json Array. If it isn't, we will 
create an
+        // ArrayNode and add just the root node to it. We do this so that we 
can easily iterate
+        // over the array node, rather than duplicating the logic or creating 
another function that takes many variables
+        // in order to implement the logic.
+        final ArrayNode arrayNode;
+        if (rootNode.isArray()) {
+            arrayNode = (ArrayNode) rootNode;
+        } else {
+            final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;
+            arrayNode = new ArrayNode(nodeFactory);
+            arrayNode.add(rootNode);
+        }
+
+        final String fragmentIdentifier = UUID.randomUUID().toString();
+
+        final Set<FlowFile> created = new HashSet<>();
+        for (int i=0; i < arrayNode.size(); i++) {
+            final JsonNode jsonNode = arrayNode.get(i);
+
+            final String sql;
+            final Map<String, String> attributes = new HashMap<>();
+
+            try {
+                if (INSERT_TYPE.equals(statementType)) {
+                    sql = generateInsert(jsonNode, attributes, tableName, 
schema, translateFieldNames, ignoreUnmappedFields);
+                } else {
+                    sql = generateUpdate(jsonNode, attributes, tableName, 
updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+                }
+            } catch (final ProcessException pe) {
+                getLogger().error("Failed to convert {} to a SQL {} statement 
due to {}; routing to failure",
+                        new Object[] { flowFile, statementType, pe.toString() 
}, pe);
+                session.remove(created);
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+
+            FlowFile sqlFlowFile = session.create(flowFile);
+            created.add(sqlFlowFile);
+
+            sqlFlowFile = session.write(sqlFlowFile, new 
OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    out.write(sql.getBytes(StandardCharsets.UTF_8));
+                }
+            });
+
+            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+            attributes.put("sql.table", tableName);
+            attributes.put("fragment.identifier", fragmentIdentifier);
+            attributes.put("fragment.count", String.valueOf(arrayNode.size()));
+            attributes.put("fragment.index", String.valueOf(i));
+
+            if (catalog != null) {
+                attributes.put("sql.catalog", catalog);
+            }
+
+            sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
+            session.transfer(sqlFlowFile, REL_SQL);
+        }
+
+        session.transfer(flowFile, REL_ORIGINAL);
+    }
+
+    private Set<String> getNormalizedColumnNames(final JsonNode node, final 
boolean translateFieldNames) {
+        final Set<String> normalizedFieldNames = new HashSet<>();
+        final Iterator<String> fieldNameItr = node.getFieldNames();
+        while (fieldNameItr.hasNext()) {
+            normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), 
translateFieldNames));
+        }
+
+        return normalizedFieldNames;
+    }
+
+    private String generateInsert(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName,
+        final TableSchema schema, final boolean translateFieldNames, final 
boolean ignoreUnmappedFields) {
+
+        final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
+        for (final String requiredColName : schema.getRequiredColumnNames()) {
+            final String normalizedColName = 
normalizeColumnName(requiredColName, translateFieldNames);
+            if (!normalizedFieldNames.contains(normalizedColName)) {
+                throw new ProcessException("JSON does not have a value for the 
Required column '" + requiredColName + "'");
+            }
+        }
+
+        final StringBuilder sqlBuilder = new StringBuilder();
+        int fieldCount = 0;
+        sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
+
+        // iterate over all of the elements in the JSON, building the SQL 
statement by adding the column names, as well as
+        // adding the column value to a "sql.args.N.value" attribute and the 
type of a "sql.args.N.type" attribute add the
+        // columns that we are inserting into
+        final Iterator<String> fieldNames = rootNode.getFieldNames();
+        while (fieldNames.hasNext()) {
+            final String fieldName = fieldNames.next();
+
+            final ColumnDescription desc = 
schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+            if (desc == null && !ignoreUnmappedFields) {
+                throw new ProcessException("Cannot map JSON field '" + 
fieldName + "' to any column in the database");
+            }
+
+            if (desc != null) {
+                if (fieldCount++ > 0) {
+                    sqlBuilder.append(", ");
+                }
+
+                sqlBuilder.append(desc.getColumnName());
+
+                final int sqlType = desc.getDataType();
+                attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+
+                final Integer colSize = desc.getColumnSize();
+                final JsonNode fieldNode = rootNode.get(fieldName);
+                if (!fieldNode.isNull()) {
+                    String fieldValue = fieldNode.asText();
+                    if (colSize != null && fieldValue.length() > colSize) {
+                        fieldValue = fieldValue.substring(0, colSize);
+                    }
+                    attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
+                }
+            }
+        }
+
+        // complete the SQL statements by adding ?'s for all of the values to 
be escaped.
+        sqlBuilder.append(") VALUES (");
+        for (int i=0; i < fieldCount; i++) {
+            if (i > 0) {
+                sqlBuilder.append(", ");
+            }
+
+            sqlBuilder.append("?");
+        }
+        sqlBuilder.append(")");
+
+        if (fieldCount == 0) {
+            throw new ProcessException("None of the fields in the JSON map to 
the columns defined by the " + tableName + " table");
+        }
+
+        return sqlBuilder.toString();
+    }
+
+    private String generateUpdate(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName, final String updateKeys,
+        final TableSchema schema, final boolean translateFieldNames, final 
boolean ignoreUnmappedFields) {
+
+        final Set<String> updateKeyNames;
+        if (updateKeys == null) {
+            updateKeyNames = schema.getPrimaryKeyColumnNames();
+        } else {
+            updateKeyNames = new HashSet<>();
+            for (final String updateKey : updateKeys.split(",")) {
+                updateKeyNames.add(updateKey.trim());
+            }
+        }
+
+        if (updateKeyNames.isEmpty()) {
+            throw new ProcessException("Table '" + tableName + "' does not 
have a Primary Key and no Update Keys were specified");
+        }
+
+        final StringBuilder sqlBuilder = new StringBuilder();
+        int fieldCount = 0;
+        sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
+
+
+        // Create a Set of all normalized Update Key names, and ensure that 
there is a field in the JSON
+        // for each of the Update Key fields.
+        final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
+        final Set<String> normalizedUpdateNames = new HashSet<>();
+        for (final String uk : updateKeyNames) {
+            final String normalizedUK = normalizeColumnName(uk, 
translateFieldNames);
+            normalizedUpdateNames.add(normalizedUK);
+
+            if (!normalizedFieldNames.contains(normalizedUK)) {
+                throw new ProcessException("JSON does not have a value for the 
" + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+            }
+        }
+
+        // iterate over all of the elements in the JSON, building the SQL 
statement by adding the column names, as well as
+        // adding the column value to a "sql.args.N.value" attribute and the 
type of a "sql.args.N.type" attribute add the
+        // columns that we are inserting into
+        Iterator<String> fieldNames = rootNode.getFieldNames();
+        while (fieldNames.hasNext()) {
+            final String fieldName = fieldNames.next();
+
+            final String normalizedColName = normalizeColumnName(fieldName, 
translateFieldNames);
+            final ColumnDescription desc = 
schema.getColumns().get(normalizedColName);
+
+            if (desc == null) {
+                if (ignoreUnmappedFields) {
+                    throw new ProcessException("Cannot map JSON field '" + 
fieldName + "' to any column in the database");
+                } else {
+                    continue;
+                }
+            }
+
+            // Check if this column is an Update Key. If so, skip it for now. 
We will come
+            // back to it after we finish the SET clause
+            if (normalizedUpdateNames.contains(normalizedColName)) {
+                continue;
+            }
+
+            if (fieldCount++ > 0) {
+                sqlBuilder.append(", ");
+            }
+
+            sqlBuilder.append(desc.getColumnName()).append(" = ?");
+            final int sqlType = desc.getDataType();
+            attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+
+            final Integer colSize = desc.getColumnSize();
+
+            final JsonNode fieldNode = rootNode.get(fieldName);
+            if (!fieldNode.isNull()) {
+                String fieldValue = rootNode.get(fieldName).asText();
+                if (colSize != null && fieldValue.length() > colSize) {
+                    fieldValue = fieldValue.substring(0, colSize);
+                }
+                attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
+            }
+        }
+
+        // Set the WHERE clause based on the Update Key values
+        sqlBuilder.append(" WHERE ");
+
+        fieldNames = rootNode.getFieldNames();
+        int whereFieldCount = 0;
+        while (fieldNames.hasNext()) {
+            final String fieldName = fieldNames.next();
+
+            final String normalizedColName = normalizeColumnName(fieldName, 
translateFieldNames);
+            final ColumnDescription desc = 
schema.getColumns().get(normalizedColName);
+            if (desc == null) {
+                continue;
+            }
+
+            // Check if this column is a Update Key. If so, skip it for now. 
We will come
+            // back to it after we finish the SET clause
+            if (!normalizedUpdateNames.contains(normalizedColName)) {
+                continue;
+            }
+
+            if (whereFieldCount++ > 0) {
+                sqlBuilder.append(" AND ");
+            }
+            fieldCount++;
+
+            sqlBuilder.append(normalizedColName).append(" = ?");
+            final int sqlType = desc.getDataType();
+            attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+
+            final Integer colSize = desc.getColumnSize();
+            String fieldValue = rootNode.get(fieldName).asText();
+            if (colSize != null && fieldValue.length() > colSize) {
+                fieldValue = fieldValue.substring(0, colSize);
+            }
+            attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+        }
+
+        return sqlBuilder.toString();
+    }
+
+    private static String normalizeColumnName(final String colName, final 
boolean translateColumnNames) {
+        return translateColumnNames ? colName.toUpperCase().replace("_", "") : 
colName;
+    }
+
+    private static class TableSchema {
+        private List<String> requiredColumnNames;
+        private Set<String> primaryKeyColumnNames;
+        private Map<String, ColumnDescription> columns;
+
+        private TableSchema(final List<ColumnDescription> columnDescriptions, 
final boolean translateColumnNames,
+                final Set<String> primaryKeyColumnNames) {
+            this.columns = new HashMap<>();
+            this.primaryKeyColumnNames = primaryKeyColumnNames;
+
+            this.requiredColumnNames = new ArrayList<>();
+            for (final ColumnDescription desc : columnDescriptions) {
+                
columns.put(ConvertJSONToSQL.normalizeColumnName(desc.columnName, 
translateColumnNames), desc);
+                if (desc.isRequired()) {
+                    requiredColumnNames.add(desc.columnName);
+                }
+            }
+        }
+
+        public Map<String, ColumnDescription> getColumns() {
+            return columns;
+        }
+
+        public List<String> getRequiredColumnNames() {
+            return requiredColumnNames;
+        }
+
+        public Set<String> getPrimaryKeyColumnNames() {
+            return primaryKeyColumnNames;
+        }
+
+        public static TableSchema from(final Connection conn, final String 
catalog, final String tableName,
+                final boolean translateColumnNames, final boolean 
includePrimaryKeys) throws SQLException {
+            final ResultSet colrs = conn.getMetaData().getColumns(catalog, 
null, tableName, "%");
+
+            final List<ColumnDescription> cols = new ArrayList<>();
+            while (colrs.next()) {
+                final ColumnDescription col = ColumnDescription.from(colrs);
+                cols.add(col);
+            }
+
+            final Set<String> primaryKeyColumns = new HashSet<>();
+            if (includePrimaryKeys) {
+                final ResultSet pkrs = 
conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
+
+                while (pkrs.next()) {
+                    final String colName = pkrs.getString("COLUMN_NAME");
+                    primaryKeyColumns.add(normalizeColumnName(colName, 
translateColumnNames));
+                }
+            }
+
+            return new TableSchema(cols, translateColumnNames, 
primaryKeyColumns);
+        }
+    }
+
+    private static class ColumnDescription {
+        private final String columnName;
+        private final int dataType;
+        private final boolean required;
+        private final Integer columnSize;
+
+        private ColumnDescription(final String columnName, final int dataType, 
final boolean required, final Integer columnSize) {
+            this.columnName = columnName;
+            this.dataType = dataType;
+            this.required = required;
+            this.columnSize = columnSize;
+        }
+
+        public int getDataType() {
+            return dataType;
+        }
+
+        public Integer getColumnSize() {
+            return columnSize;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public boolean isRequired() {
+            return required;
+        }
+
+        public static ColumnDescription from(final ResultSet resultSet) throws 
SQLException {
+            final String columnName = resultSet.getString("COLUMN_NAME");
+            final int dataType = resultSet.getInt("DATA_TYPE");
+            final int colSize = resultSet.getInt("COLUMN_SIZE");
+
+            final String nullableValue = resultSet.getString("IS_NULLABLE");
+            final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) 
|| nullableValue.isEmpty();
+            final String defaultValue = resultSet.getString("COLUMN_DEF");
+            final String autoIncrementValue = 
resultSet.getString("IS_AUTOINCREMENT");
+            final boolean isAutoIncrement = 
"YES".equalsIgnoreCase(autoIncrementValue);
+            final boolean required = !isNullable && !isAutoIncrement && 
defaultValue == null;
+
+            return new ColumnDescription(columnName, dataType, required, 
colSize == 0 ? null : colSize);
+        }
+    }
+
+    private static class SchemaKey {
+        private final String catalog;
+        private final String tableName;
+
+        public SchemaKey(final String catalog, final String tableName) {
+            this.catalog = catalog;
+            this.tableName = tableName;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((catalog == null) ? 0 : 
catalog.hashCode());
+            result = prime * result + ((tableName == null) ? 0 : 
tableName.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            final SchemaKey other = (SchemaKey) obj;
+            if (catalog == null) {
+                if (other.catalog != null) {
+                    return false;
+                }
+            } else if (!catalog.equals(other.catalog)) {
+                return false;
+            }
+
+
+            if (tableName == null) {
+                if (other.tableName != null) {
+                    return false;
+                }
+            } else if (!tableName.equals(other.tableName)) {
+                return false;
+            }
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
new file mode 100644
index 0000000..b087737
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+@SupportsBatching
+@SeeAlso(ConvertJSONToSQL.class)
+@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
+@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content 
of an incoming FlowFile is expected to be the SQL command "
+        + "to execute. The SQL command may use the ? to escape parameters. In 
this case, the parameters to use must exist as FlowFile attributes "
+        + "with the naming convention sql.args.N.type and sql.args.N.value, 
where N is a positive integer. The sql.args.N.type is expected to be "
+        + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format.")
+@ReadsAttributes({
+    @ReadsAttribute(attribute="fragment.identifier", description="If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine whether or "
+            + "not two FlowFiles belong to the same transaction."),
+    @ReadsAttribute(attribute="fragment.count", description="If the <Support 
Fragment Transactions> property is true, this attribute is used to determine 
how many FlowFiles "
+            + "are needed to complete the transaction."),
+    @ReadsAttribute(attribute="fragment.index", description="If the <Support 
Fragment Transactions> property is true, this attribute is used to determine 
the order that the FlowFiles "
+            + "in a transaction should be evaluated."),
+    @ReadsAttribute(attribute="sql.args.N.type", description="Incoming 
FlowFiles are expected to be parameterized SQL statements. The type of each 
Parameter is specified as an integer "
+            + "that represents the JDBC Type of the parameter."),
+    @ReadsAttribute(attribute="sql.args.N.value", description="Incoming 
FlowFiles are expected to be parameterized SQL statements. The value of the 
Parameters are specified as "
+            + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so 
on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute.")
+})
+@WritesAttributes({
+    @WritesAttribute(attribute="sql.generated.key", description="If the 
database generated a key for an INSERT statement and the Obtain Generated Keys 
property is set to true, "
+            + "this attribute will be added to indicate the generated key, if 
possible. This feature is not supported by all database vendors.")
+})
+public class PutSQL extends AbstractProcessor {
+
+    static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
+            .name("JDBC Connection Pool")
+            .description("Specifies the JDBC Connection Pool to use in order 
to convert the JSON message to a SQL statement. "
+                    + "The Connection Pool is necessary in order to determine 
the appropriate database column types.")
+            .identifiesControllerService(DBCPService.class)
+            .required(true)
+            .build();
+    static final PropertyDescriptor SUPPORT_TRANSACTIONS = new 
PropertyDescriptor.Builder()
+            .name("Support Fragmented Transactions")
+            .description("If true, when a FlowFile is consumed by this 
Processor, the Processor will first check the fragment.identifier and 
fragment.count attributes of that FlowFile. "
+                    + "If the fragment.count value is greater than 1, the 
Processor will not process any FlowFile will that fragment.identifier until all 
are available; "
+                    + "at that point, it will process all FlowFiles with that 
fragment.identifier as a single transaction, in the order specified by the 
FlowFiles' fragment.index attributes. "
+                    + "This Provides atomicity of those SQL statements. If 
this value is false, these attributes will be ignored and the updates will 
occur independent of one another.")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+    static final PropertyDescriptor TRANSACTION_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Transaction Timeout")
+            .description("If the <Support Fragmented Transactions> property is 
set to true, specifies how long to wait for all FlowFiles for a particular 
fragment.identifier attribute "
+                    + "to arrive before just transferring all of the FlowFiles 
with that identifier to the 'failure' relationship")
+            .required(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+    static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new 
PropertyDescriptor.Builder()
+            .name("Obtain Generated Keys")
+            .description("If true, any key that is automatically generated by 
the database will be added to the FlowFile that generated it using the 
sql.generate.key attribute. "
+                    + "This may result in slightly slower performance and is 
not supported by all databases.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
database is successfully updated")
+            .build();
+    static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
+                    + "such as an invalid query or an integrity constraint 
violation")
+            .build();
+
+    private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("sql\\.args\\.(\\d+)\\.type");
+    private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
+    private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
+    private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
+    private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CONNECTION_POOL);
+        properties.add(SUPPORT_TRANSACTIONS);
+        properties.add(TRANSACTION_TIMEOUT);
+        properties.add(BATCH_SIZE);
+        properties.add(OBTAIN_GENERATED_KEYS);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_RETRY);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFilePoll poll = pollFlowFiles(context, session);
+        if (poll == null) {
+            return;
+        }
+
+        final List<FlowFile> flowFiles = poll.getFlowFiles();
+        if (flowFiles == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final boolean obtainKeys = 
context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
+        final Map<String, StatementFlowFileEnclosure> statementMap = new 
HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
+        final List<FlowFile> sentFlowFiles = new ArrayList<>();  // flowfiles 
that have been sent
+        final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all 
flowfiles that we have processed
+        final Set<StatementFlowFileEnclosure> enclosuresToExecute = new 
LinkedHashSet<>(); // the enclosures that we've processed
+
+        // Because we can have a transaction that is necessary across multiple 
FlowFiles, things get complicated when
+        // some FlowFiles have been transferred to a relationship and then 
there is a failure. As a result, we will just
+        // map all FlowFiles to their destination relationship and do the 
session.transfer at the end. This way, if there
+        // is a failure, we can route all FlowFiles to failure if we need to.
+        final Map<FlowFile, Relationship> destinationRelationships = new 
HashMap<>();
+
+        final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+        try (final Connection conn = dbcpService.getConnection()) {
+            final boolean originalAutoCommit = conn.getAutoCommit();
+            try {
+                conn.setAutoCommit(false);
+
+                for (final FlowFile flowFile : flowFiles) {
+                    processedFlowFiles.add(flowFile);
+                    final String sql = getSQL(session, flowFile);
+
+                    // Get the appropriate PreparedStatement to use.
+                    final StatementFlowFileEnclosure enclosure;
+                    try {
+                        enclosure = getEnclosure(sql, conn, statementMap, 
obtainKeys, poll.isFragmentedTransaction());
+                    } catch (final SQLNonTransientException e) {
+                        getLogger().error("Failed to update database for {} 
due to {}; routing to failure", new Object[] {flowFile, e});
+                        destinationRelationships.put(flowFile, REL_FAILURE);
+                        continue;
+                    }
+
+                    final PreparedStatement stmt = enclosure.getStatement();
+
+                    // set the appropriate parameters on the statement.
+                    try {
+                        setParameters(stmt, flowFile.getAttributes());
+                    } catch (final SQLException | ProcessException pe) {
+                        getLogger().error("Cannot update database for {} due 
to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
+                        destinationRelationships.put(flowFile, REL_FAILURE);
+                        continue;
+                    }
+
+                    // If we need to obtain keys, we cannot do so in a a Batch 
Update. So we have to execute the statement and close it.
+                    if (obtainKeys) {
+                        try {
+                            // Execute the actual update.
+                            stmt.executeUpdate();
+
+                            // attempt to determine the key that was 
generated, if any. This is not supported by all
+                            // database vendors, so if we cannot determine the 
generated key (or if the statement is not an INSERT),
+                            // we will just move on without setting the 
attribute.
+                            FlowFile sentFlowFile = flowFile;
+                            final String generatedKey = 
determineGeneratedKey(stmt);
+                            if (generatedKey != null) {
+                                sentFlowFile = 
session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
+                            }
+
+                            stmt.close();
+                            sentFlowFiles.add(sentFlowFile);
+                        } catch (final SQLNonTransientException e) {
+                            getLogger().error("Failed to update database for 
{} due to {}; routing to failure", new Object[] {flowFile, e});
+                            destinationRelationships.put(flowFile, 
REL_FAILURE);
+                            continue;
+                        } catch (final SQLException e) {
+                            getLogger().error("Failed to update database for 
{} due to {}; it is possible that retrying the operation will succeed, so 
routing to retry", new Object[] {flowFile, e});
+                            destinationRelationships.put(flowFile, REL_RETRY);
+                            continue;
+                        }
+                    } else {
+                        // We don't need to obtain keys. Just add the 
statement to the batch.
+                        stmt.addBatch();
+                        enclosure.addFlowFile(flowFile);
+                        enclosuresToExecute.add(enclosure);
+                    }
+                }
+
+                // If we are not trying to obtain the generated keys, we will 
have
+                // PreparedStatement's that have batches added to them. We 
need to execute each batch and close
+                // the PreparedStatement.
+                for (final StatementFlowFileEnclosure enclosure : 
enclosuresToExecute) {
+                    try {
+                        final PreparedStatement stmt = 
enclosure.getStatement();
+                        stmt.executeBatch();
+                        sentFlowFiles.addAll(enclosure.getFlowFiles());
+                    } catch (final BatchUpdateException e) {
+                        // If we get a BatchUpdateException, then we want to 
determine which FlowFile caused the failure,
+                        // and route that FlowFile to failure while routing 
those that finished processing to success and those
+                        // that have not yet been executed to retry. If the 
FlowFile was
+                        // part of a fragmented transaction, then we must roll 
back all updates for this connection, because
+                        // other statements may have been successful and been 
part of this transaction.
+                        final int[] updateCounts = e.getUpdateCounts();
+                        final int offendingFlowFileIndex = updateCounts.length;
+                        final List<FlowFile> batchFlowFiles = 
enclosure.getFlowFiles();
+
+                        if (poll.isFragmentedTransaction()) {
+                            // There are potentially multiple statements for 
this one transaction. As a result,
+                            // we need to roll back the entire transaction and 
route all of the FlowFiles to failure.
+                            conn.rollback();
+                            final FlowFile offendingFlowFile = 
batchFlowFiles.get(offendingFlowFileIndex);
+                            getLogger().error("Failed to update database due 
to a failed batch update. A total of {} FlowFiles are required for this 
transaction, so routing all to failure. "
+                                    + "Offending FlowFile was {}, which caused 
the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, 
e});
+                            session.transfer(flowFiles, REL_FAILURE);
+                            return;
+                        }
+
+                        // In the presence of a BatchUpdateException, the 
driver has the option of either stopping when an error
+                        // occurs, or continuing. If it continues, then it 
must account for all statements in the batch and for
+                        // those that fail return a Statement.EXECUTE_FAILED 
for the number of rows updated.
+                        // So we will iterate over all of the update counts 
returned. If any is equal to Statement.EXECUTE_FAILED,
+                        // we will route the corresponding FlowFile to 
failure. Otherwise, the FlowFile will go to success
+                        // unless it has not yet been processed (its index in 
the List > updateCounts.length).
+                        int failureCount = 0;
+                        int successCount = 0;
+                        int retryCount = 0;
+                        for (int i=0; i < updateCounts.length; i++) {
+                            final int updateCount = updateCounts[i];
+                            final FlowFile flowFile = batchFlowFiles.get(i);
+                            if (updateCount == Statement.EXECUTE_FAILED) {
+                                destinationRelationships.put(flowFile, 
REL_FAILURE);
+                                failureCount++;
+                            } else {
+                                destinationRelationships.put(flowFile, 
REL_SUCCESS);
+                                successCount++;
+                            }
+                        }
+
+                        if (failureCount == 0) {
+                            // if no failures found, the driver decided not to 
execute the statements after the
+                            // failure, so route the last one to failure.
+                            final FlowFile failedFlowFile = 
batchFlowFiles.get(updateCounts.length);
+                            destinationRelationships.put(failedFlowFile, 
REL_FAILURE);
+                            failureCount++;
+                        }
+
+                        if (updateCounts.length < batchFlowFiles.size()) {
+                            final List<FlowFile> unexecuted = 
batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
+                            for (final FlowFile flowFile : unexecuted) {
+                                destinationRelationships.put(flowFile, 
REL_RETRY);
+                                retryCount++;
+                            }
+                        }
+
+                        getLogger().error("Failed to update database due to a 
failed batch update. There were a total of {} FlowFiles that failed, {} that 
succeeded, "
+                                + "and {} that were not execute and will be 
routed to retry; ", new Object[] {failureCount, successCount, retryCount});
+                    } catch (final SQLNonTransientException e) {
+                        getLogger().error("Failed to update database for {} 
due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
+
+                        for (final FlowFile flowFile : 
enclosure.getFlowFiles()) {
+                            destinationRelationships.put(flowFile, 
REL_FAILURE);
+                        }
+                        continue;
+                    } catch (final SQLException e) {
+                        getLogger().error("Failed to update database for {} 
due to {}; it is possible that retrying the operation will succeed, so routing 
to retry",
+                                new Object[] {enclosure.getFlowFiles(), e});
+
+                        for (final FlowFile flowFile : 
enclosure.getFlowFiles()) {
+                            destinationRelationships.put(flowFile, REL_RETRY);
+                        }
+                        continue;
+                    } finally {
+                        enclosure.getStatement().close();
+                    }
+                }
+            } finally {
+                try {
+                    conn.commit();
+                } finally {
+                    // make sure that we try to set the auto commit back to 
whatever it was.
+                    if (originalAutoCommit) {
+                        try {
+                            conn.setAutoCommit(originalAutoCommit);
+                        } catch (final SQLException se) {
+                        }
+                    }
+                }
+            }
+
+            // Determine the database URL
+            String url = "jdbc://unknown-host";
+            try {
+                url = conn.getMetaData().getURL();
+            } catch (final SQLException sqle) {
+            }
+
+            // Emit a Provenance SEND event
+            final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            for (final FlowFile flowFile : sentFlowFiles) {
+                session.getProvenanceReporter().send(flowFile, url, 
transmissionMillis, true);
+            }
+
+            for (final FlowFile flowFile : sentFlowFiles) {
+                destinationRelationships.put(flowFile, REL_SUCCESS);
+            }
+        } catch (final SQLException e) {
+            // Failed FlowFiles are all of them that we have processed minus 
those that were successfully sent
+            final List<FlowFile> failedFlowFiles = processedFlowFiles;
+            failedFlowFiles.removeAll(sentFlowFiles);
+
+            // All FlowFiles yet to be processed is all FlowFiles minus those 
processed
+            final List<FlowFile> retry = flowFiles;
+            retry.removeAll(processedFlowFiles);
+
+            final Relationship rel;
+            if (e instanceof SQLNonTransientException) {
+                getLogger().error("Failed to update database for {} due to {}; 
routing to failure", new Object[] {failedFlowFiles, e});
+                rel = REL_FAILURE;
+            } else {
+                getLogger().error("Failed to update database for {} due to {}; 
it is possible that retrying the operation will succeed, so routing to retry", 
new Object[] {failedFlowFiles, e});
+                rel = REL_RETRY;
+            }
+
+            for (final FlowFile flowFile : failedFlowFiles) {
+                destinationRelationships.put(flowFile, rel);
+            }
+
+            for (final FlowFile flowFile : retry) {
+                destinationRelationships.put(flowFile, Relationship.SELF);
+            }
+        }
+
+        for (final Map.Entry<FlowFile, Relationship> entry : 
destinationRelationships.entrySet()) {
+            session.transfer(entry.getKey(), entry.getValue());
+        }
+    }
+
+
+    /**
+     * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles 
are available, returns <code>null</code>.
+     * Otherwise, a List of FlowFiles will be returned.
+     *
+     * If all FlowFiles pulled are not eligible to be processed, the FlowFiles 
will be penalized and transferred back
+     * to the input queue and an empty List will be returned.
+     *
+     * Otherwise, if the Support Fragmented Transactions property is true, all 
FlowFiles that belong to the same
+     * transaction will be sorted in the order that they should be evaluated.
+     *
+     * @param context the process context for determining properties
+     * @param session the process session for pulling flowfiles
+     * @return a FlowFilePoll containing a List of FlowFiles to process, or 
<code>null</code> if there are no FlowFiles to process
+     */
+    private FlowFilePoll pollFlowFiles(final ProcessContext context, final 
ProcessSession session) {
+        // Determine which FlowFile Filter to use in order to obtain FlowFiles.
+        final boolean useTransactions = 
context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
+        boolean fragmentedTransaction = false;
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        List<FlowFile> flowFiles;
+        if (useTransactions) {
+            final TransactionalFlowFileFilter filter = new 
TransactionalFlowFileFilter();
+            flowFiles = session.get(filter);
+            fragmentedTransaction = filter.isFragmentedTransaction();
+        } else {
+            flowFiles = session.get(batchSize);
+        }
+
+        if (flowFiles.isEmpty()) {
+            return null;
+        }
+
+        // If we are supporting fragmented transactions, verify that all 
FlowFiles are correct
+        if (fragmentedTransaction) {
+            final Relationship relationship = determineRelationship(flowFiles, 
context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
+            if (relationship != null) {
+                // if transferring back to self, penalize the FlowFiles.
+                if (relationship == Relationship.SELF) {
+                    // penalize all of the FlowFiles that we are going to 
route to SELF.
+                    final ListIterator<FlowFile> itr = 
flowFiles.listIterator();
+                    while (itr.hasNext()) {
+                        final FlowFile flowFile = itr.next();
+                        final FlowFile penalized = session.penalize(flowFile);
+                        itr.remove();
+                        itr.add(penalized);
+                    }
+                }
+
+                session.transfer(flowFiles, relationship);
+                return null;
+            }
+
+            // sort by fragment index.
+            Collections.sort(flowFiles, new Comparator<FlowFile>() {
+                @Override
+                public int compare(final FlowFile o1, final FlowFile o2) {
+                    return 
Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), 
Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR)));
+                }
+            });
+        }
+
+        return new FlowFilePoll(flowFiles, fragmentedTransaction);
+    }
+
+
+    /**
+     * Returns the key that was generated from the given statement, or 
<code>null</code> if no key
+     * was generated or it could not be determined.
+     *
+     * @param stmt the statement that generated a key
+     * @return the key that was generated from the given statement, or 
<code>null</code> if no key
+     * was generated or it could not be determined.
+     */
+    private String determineGeneratedKey(final PreparedStatement stmt) {
+        try {
+            final ResultSet generatedKeys = stmt.getGeneratedKeys();
+            if (generatedKeys != null && generatedKeys.next()) {
+                return generatedKeys.getString(1);
+            }
+        } catch (final SQLException sqle) {
+            // This is not supported by all vendors. This is a best-effort 
approach.
+        }
+
+        return null;
+    }
+
+
+    /**
+     * Returns the StatementFlowFileEnclosure that should be used for 
executing the given SQL statement
+     *
+     * @param sql the SQL to execute
+     * @param conn the connection from which a PreparedStatement can be created
+     * @param stmtMap the existing map of SQL to PreparedStatements
+     * @param obtainKeys whether or not we need to obtain generated keys for 
INSERT statements
+     * @param fragmentedTransaction whether or not the SQL pertains to a 
fragmented transaction
+     *
+     * @return a StatementFlowFileEnclosure to use for executing the given SQL 
statement
+     *
+     * @throws SQLException if unable to create the appropriate 
PreparedStatement
+     */
+    private StatementFlowFileEnclosure getEnclosure(final String sql, final 
Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
+            final boolean obtainKeys, final boolean fragmentedTransaction) 
throws SQLException {
+        StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
+        if (enclosure != null) {
+            return enclosure;
+        }
+
+        if (obtainKeys) {
+            // Create a new Prepared Statement, requesting that it return the 
generated keys.
+            PreparedStatement stmt = conn.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS);
+
+            if (stmt == null) {
+                // since we are passing Statement.RETURN_GENERATED_KEYS, calls 
to conn.prepareStatement will
+                // in some cases (at least for DerbyDB) return null.
+                // We will attempt to recompile the statement without the 
generated keys being returned.
+                stmt = conn.prepareStatement(sql);
+            }
+
+            // If we need to obtain keys, then we cannot do a Batch Update. In 
this case,
+            // we don't need to store the PreparedStatement in the Map because 
we aren't
+            // doing an addBatch/executeBatch. Instead, we will use the 
statement once
+            // and close it.
+            return new StatementFlowFileEnclosure(stmt);
+        } else if (fragmentedTransaction) {
+            // We cannot use Batch Updates if we have a transaction that spans 
multiple FlowFiles.
+            // If we did, we could end up processing the statements out of 
order. It's quite possible
+            // that we could refactor the code some to allow for this, but as 
it is right now, this
+            // could cause problems. This is because we have a Map<String, 
StatementFlowFileEnclosure>.
+            // If we had a transaction that needed to execute Stmt A with some 
parameters, then Stmt B with
+            // some parameters, then Stmt A with different parameters, this 
would become problematic because
+            // the executeUpdate would be evaluated first for Stmt A (the 1st 
and 3rd statements, and then
+            // the second statement would be evaluated).
+            final PreparedStatement stmt = conn.prepareStatement(sql);
+            return new StatementFlowFileEnclosure(stmt);
+        }
+
+        final PreparedStatement stmt = conn.prepareStatement(sql);
+        enclosure = new StatementFlowFileEnclosure(stmt);
+        stmtMap.put(sql, enclosure);
+        return enclosure;
+    }
+
+
+    /**
+     * Determines the SQL statement that should be executed for the given 
FlowFile
+     *
+     * @param session the session that can be used to access the given FlowFile
+     * @param flowFile the FlowFile whose SQL statement should be executed
+     *
+     * @return the SQL that is associated with the given FlowFile
+     */
+    private String getSQL(final ProcessSession session, final FlowFile 
flowFile) {
+        // Read the SQL from the FlowFile's content
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer);
+            }
+        });
+
+        // Create the PreparedStatement to use for this FlowFile.
+        final String sql = new String(buffer, StandardCharsets.UTF_8);
+        return sql;
+    }
+
+
+    /**
+     * Sets all of the appropriate parameters on the given PreparedStatement, 
based on the given FlowFile attributes.
+     *
+     * @param stmt the statement to set the parameters on
+     * @param attributes the attributes from which to derive parameter 
indices, values, and types
+     * @throws SQLException if the PreparedStatement throws a SQLException 
when the appropriate setter is called
+     */
+    private void setParameters(final PreparedStatement stmt, final Map<String, 
String> attributes) throws SQLException {
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String key = entry.getKey();
+            final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+            if (matcher.matches()) {
+                final int parameterIndex = Integer.parseInt(matcher.group(1));
+
+                final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
+                if (!isNumeric) {
+                    throw new ProcessException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
+                }
+
+                final int jdbcType = Integer.parseInt(entry.getValue());
+                final String valueAttrName = "sql.args." + parameterIndex + 
".value";
+                final String parameterValue = attributes.get(valueAttrName);
+
+                try {
+                    setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType);
+                } catch (final NumberFormatException nfe) {
+                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Determines which relationship the given FlowFiles should go to, based 
on a transaction timing out or
+     * transaction information not being present. If the FlowFiles should be 
processed and not transferred
+     * to any particular relationship yet, will return <code>null</code>
+     *
+     * @param flowFiles the FlowFiles whose relationship is to be determined
+     * @param transactionTimeoutMillis the maximum amount of time (in 
milliseconds) that we should wait
+     *              for all FlowFiles in a transaction to be present before 
routing to failure
+     * @return the appropriate relationship to route the FlowFiles to, or 
<code>null</code> if the FlowFiles
+     *             should instead be processed
+     */
+    Relationship determineRelationship(final List<FlowFile> flowFiles, final 
Long transactionTimeoutMillis) {
+        int selectedNumFragments = 0;
+        final BitSet bitSet = new BitSet();
+
+        for (final FlowFile flowFile : flowFiles) {
+            final String fragmentCount = 
flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
+            if (fragmentCount == null && flowFiles.size() == 1) {
+                return null;
+            } else if (fragmentCount == null) {
+                getLogger().error("Cannot process {} because there are {} 
FlowFiles with the same fragment.identifier "
+                        + "attribute but not all FlowFiles have a 
fragment.count attribute; routing all to failure",  new Object[] {flowFile, 
flowFiles.size()});
+                return REL_FAILURE;
+            }
+
+            final int numFragments;
+            try {
+                numFragments = Integer.parseInt(fragmentCount);
+            } catch (final NumberFormatException nfe) {
+                getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not an integer; "
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
+                return REL_FAILURE;
+            }
+
+            if (numFragments < 1) {
+                getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not a positive integer; "
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
+                return REL_FAILURE;
+            }
+
+            if (selectedNumFragments == 0) {
+                selectedNumFragments = numFragments;
+            } else if (numFragments != selectedNumFragments) {
+                getLogger().error("Cannot process {} because the 
fragment.count attribute has different values for different FlowFiles with the 
same fragment.identifier; "
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
+                return REL_FAILURE;
+            }
+
+            final String fragmentIndex = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
+            if (fragmentIndex == null) {
+                getLogger().error("Cannot process {} because the 
fragment.index attribute is missing; "
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
+                return REL_FAILURE;
+            }
+
+            final int idx;
+            try {
+                idx = Integer.parseInt(fragmentIndex);
+            } catch (final NumberFormatException nfe) {
+                getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not an integer; "
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
+                return REL_FAILURE;
+            }
+
+            if (idx < 0) {
+                getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not a positive integer; "
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
+                return REL_FAILURE;
+            }
+
+            if (bitSet.get(idx)) {
+                getLogger().error("Cannot process {} because it has the same 
value for the fragment.index attribute as another FlowFile with the same 
fragment.identifier; "
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
+                return REL_FAILURE;
+            }
+
+            bitSet.set(idx);
+        }
+
+        if (selectedNumFragments == flowFiles.size()) {
+            return null;   // no relationship to route FlowFiles to yet - 
process the FlowFiles.
+        }
+
+        long latestQueueTime = 0L;
+        for (final FlowFile flowFile : flowFiles) {
+            if (flowFile.getLastQueueDate() != null && 
flowFile.getLastQueueDate() > latestQueueTime) {
+                latestQueueTime = flowFile.getLastQueueDate();
+            }
+        }
+
+        if (transactionTimeoutMillis != null) {
+            if (latestQueueTime > 0L && System.currentTimeMillis() - 
latestQueueTime > transactionTimeoutMillis) {
+                getLogger().error("The transaction timeout has expired for the 
following FlowFiles; they will be routed to failure: {}", new Object[] 
{flowFiles});
+                return REL_FAILURE;
+            }
+        }
+
+        getLogger().debug("Not enough FlowFiles for transaction. Returning all 
FlowFiles to queue");
+        return Relationship.SELF;  // not enough FlowFiles for this 
transaction. Return them all to queue.
+    }
+
+    /**
+     * Determines how to map the given value to the appropriate JDBC data type 
and sets the parameter on the
+     * provided PreparedStatement
+     *
+     * @param stmt the PreparedStatement to set the parameter on
+     * @param attrName the name of the attribute that the parameter is coming 
from - for logging purposes
+     * @param parameterIndex the index of the SQL parameter to set
+     * @param parameterValue the value of the SQL parameter to set
+     * @param jdbcType the JDBC Type of the SQL parameter to set
+     * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
+     */
+    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException {
+        if (parameterValue == null) {
+            stmt.setNull(parameterIndex, jdbcType);
+        } else {
+            switch (jdbcType) {
+                case Types.BIT:
+                    stmt.setBoolean(parameterIndex, 
Boolean.parseBoolean(parameterValue));
+                    break;
+                case Types.TINYINT:
+                    stmt.setByte(parameterIndex, 
Byte.parseByte(parameterValue));
+                    break;
+                case Types.SMALLINT:
+                    stmt.setShort(parameterIndex, 
Short.parseShort(parameterValue));
+                    break;
+                case Types.INTEGER:
+                    stmt.setInt(parameterIndex, 
Integer.parseInt(parameterValue));
+                    break;
+                case Types.BIGINT:
+                    stmt.setLong(parameterIndex, 
Long.parseLong(parameterValue));
+                    break;
+                case Types.REAL:
+                    stmt.setFloat(parameterIndex, 
Float.parseFloat(parameterValue));
+                    break;
+                case Types.FLOAT:
+                case Types.DOUBLE:
+                    stmt.setDouble(parameterIndex, 
Double.parseDouble(parameterValue));
+                    break;
+                case Types.DATE:
+                    stmt.setDate(parameterIndex, new 
Date(Long.parseLong(parameterValue)));
+                    break;
+                case Types.TIME:
+                    stmt.setTime(parameterIndex, new 
Time(Long.parseLong(parameterValue)));
+                    break;
+                case Types.TIMESTAMP:
+                    stmt.setTimestamp(parameterIndex, new 
Timestamp(Long.parseLong(parameterValue)));
+                    break;
+                case Types.CHAR:
+                case Types.VARCHAR:
+                case Types.LONGNVARCHAR:
+                case Types.LONGVARCHAR:
+                    stmt.setString(parameterIndex, parameterValue);
+                    break;
+                default:
+                    throw new SQLException("The '" + attrName + "' attribute 
has a value of '" + parameterValue
+                            + "' and a type of '" + jdbcType + "' but this is 
not a known data type");
+            }
+        }
+    }
+
+
+    /**
+     * A FlowFileFilter that is responsible for ensuring that the FlowFiles 
returned either belong
+     * to the same "fragmented transaction" (i.e., 1 transaction whose 
information is fragmented
+     * across multiple FlowFiles) or that none of the FlowFiles belongs to a 
fragmented transaction
+     */
+    static class TransactionalFlowFileFilter implements FlowFileFilter {
+        private String selectedId = null;
+        private int numSelected = 0;
+        private boolean ignoreFragmentIdentifiers = false;
+
+        public boolean isFragmentedTransaction() {
+            return !ignoreFragmentIdentifiers;
+        }
+
+        @Override
+        public FlowFileFilterResult filter(final FlowFile flowFile) {
+            final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR);
+            final String fragCount = 
flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
+
+            // if first FlowFile selected is not part of a fragmented 
transaction, then
+            // we accept any FlowFile that is also not part of a fragmented 
transaction.
+            if (ignoreFragmentIdentifiers) {
+                if (fragmentId == null || "1".equals(fragCount)) {
+                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                } else {
+                    return FlowFileFilterResult.REJECT_AND_CONTINUE;
+                }
+            }
+
+            if (fragmentId == null || "1".equals(fragCount)) {
+                if (selectedId == null) {
+                    // Only one FlowFile in the transaction.
+                    ignoreFragmentIdentifiers = true;
+                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                } else {
+                    // we've already selected 1 FlowFile, and this one doesn't 
match.
+                    return FlowFileFilterResult.REJECT_AND_CONTINUE;
+                }
+            }
+
+            if (selectedId == null) {
+                // select this fragment id as the chosen one.
+                selectedId = fragmentId;
+                numSelected++;
+                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            }
+
+            if (selectedId.equals(fragmentId)) {
+                // fragment id's match. Find out if we have all of the 
necessary fragments or not.
+                final int numFragments;
+                if (NUMBER_PATTERN.matcher(fragCount).matches()) {
+                    numFragments = Integer.parseInt(fragCount);
+                } else {
+                    numFragments = Integer.MAX_VALUE;
+                }
+
+                if (numSelected >= numFragments - 1) {
+                    // We have all of the fragments we need for this 
transaction.
+                    return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                } else {
+                    // We still need more fragments for this transaction, so 
accept this one and continue.
+                    numSelected++;
+                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                }
+            } else {
+                return FlowFileFilterResult.REJECT_AND_CONTINUE;
+            }
+        }
+    }
+
+
+    /**
+     * A simple, immutable data structure to hold a List of FlowFiles and an 
indicator as to whether
+     * or not those FlowFiles represent a "fragmented transaction" - that is, 
a collection of FlowFiles
+     * that all must be executed as a single transaction (we refer to it as a 
fragment transaction
+     * because the information for that transaction, including SQL and the 
parameters, is fragmented
+     * across multiple FlowFiles).
+     */
+    private static class FlowFilePoll {
+        private final List<FlowFile> flowFiles;
+        private final boolean fragmentedTransaction;
+
+        public FlowFilePoll(final List<FlowFile> flowFiles, final boolean 
fragmentedTransaction) {
+            this.flowFiles = flowFiles;
+            this.fragmentedTransaction = fragmentedTransaction;
+        }
+
+        public List<FlowFile> getFlowFiles() {
+            return flowFiles;
+        }
+
+        public boolean isFragmentedTransaction() {
+            return fragmentedTransaction;
+        }
+    }
+
+
+    /**
+     * A simple, immutable data structure to hold a Prepared Statement and a 
List of FlowFiles
+     * for which that statement should be evaluated.
+     */
+    private static class StatementFlowFileEnclosure {
+        private final PreparedStatement statement;
+        private final List<FlowFile> flowFiles = new ArrayList<>();
+
+        public StatementFlowFileEnclosure(final PreparedStatement statement) {
+            this.statement = statement;
+        }
+
+        public PreparedStatement getStatement() {
+            return statement;
+        }
+
+        public List<FlowFile> getFlowFiles() {
+            return flowFiles;
+        }
+
+        public void addFlowFile(final FlowFile flowFile) {
+            this.flowFiles.add(flowFile);
+        }
+
+        @Override
+        public int hashCode() {
+            return statement.hashCode();
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (obj == this) {
+                return false;
+            }
+            if (!(obj instanceof StatementFlowFileEnclosure)) {
+                return false;
+            }
+
+            final StatementFlowFileEnclosure other = 
(StatementFlowFileEnclosure) obj;
+            return statement.equals(other.getStatement());
+        }
+    }
+}

Reply via email to