NIFI-293: Added processors ConvertFlatJSONToSQL, PutSQL

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1a37c95f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1a37c95f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1a37c95f

Branch: refs/heads/master
Commit: 1a37c95f5883cdca858c175a637c67d5f012908c
Parents: 16dc5d5
Author: Mark Payne <[email protected]>
Authored: Wed Aug 12 17:17:12 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Aug 19 17:36:29 2015 -0400

----------------------------------------------------------------------
 .../src/main/resources/META-INF/NOTICE          |  23 +
 .../nifi-standard-processors/pom.xml            |   4 +
 .../org.apache.nifi.processor.Processor         |   2 +
 .../standard/ConvertFlatJSONToSQL.java          | 600 +++++++++++++++++++
 .../apache/nifi/processors/standard/PutSQL.java | 203 +++++++
 .../standard/TestConvertFlatJSONToSQL.java      | 294 +++++++++
 .../nifi/processors/standard/TestPutSQL.java    | 246 ++++++++
 .../malformed-person-extra-comma.json           |   4 +
 .../TestConvertFlatJSONToSQL/person-1.json      |   5 +
 .../person-without-code.json                    |   4 +
 .../person-without-id.json                      |   4 +
 11 files changed, 1389 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
index 4a81d74..152e3bc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
@@ -114,6 +114,29 @@ The following binary components are provided under the 
Apache Software License v
       Apache Tika Core
       Copyright 2007-2015 The Apache Software Foundation
 
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta ([email protected]), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
 ************************
 Common Development and Distribution License 1.1
 ************************

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 9946122..fd267d1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -180,6 +180,10 @@ language governing permissions and limitations under the 
License. -->
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-dbcp-service-api</artifactId>
         </dependency>
+               <dependency>
+                       <groupId>org.codehaus.jackson</groupId>
+                       <artifactId>jackson-mapper-asl</artifactId>
+               </dependency>
 
         <dependency>
             <groupId>org.apache.derby</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index e62b57f..0ce1456 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,6 +16,7 @@ org.apache.nifi.processors.standard.Base64EncodeContent
 org.apache.nifi.processors.standard.CompressContent
 org.apache.nifi.processors.standard.ControlRate
 org.apache.nifi.processors.standard.ConvertCharacterSet
+org.apache.nifi.processors.standard.ConvertJSONToSQL
 org.apache.nifi.processors.standard.DetectDuplicate
 org.apache.nifi.processors.standard.DistributeLoad
 org.apache.nifi.processors.standard.DuplicateFlowFile
@@ -52,6 +53,7 @@ org.apache.nifi.processors.standard.PutFile
 org.apache.nifi.processors.standard.PutFTP
 org.apache.nifi.processors.standard.PutJMS
 org.apache.nifi.processors.standard.PutSFTP
+org.apache.nifi.processors.standard.PutSQL
 org.apache.nifi.processors.standard.ReplaceText
 org.apache.nifi.processors.standard.ReplaceTextWithMapping
 org.apache.nifi.processors.standard.RouteOnAttribute

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
new file mode 100644
index 0000000..57d855f
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+@SideEffectFree
+@SupportsBatching
+@SeeAlso(PutSQL.class)
+@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", 
"flat"})
+@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or 
INSERT SQL statement. The incoming FlowFile is expected to be "
+               + "\"flat\" JSON message, meaning that it consists of a single 
JSON element and each field maps to a simple type. If a field maps to "
+               + "a JSON object, that JSON object will be interpreted as Text. 
Upon successful conversion, the original FlowFile is routed to the 'original' "
+               + "relationship and the SQL is routed to the 'sql' 
relationship.")
+public class ConvertFlatJSONToSQL extends AbstractProcessor {
+       private static final String UPDATE_TYPE = "UPDATE";
+       private static final String INSERT_TYPE = "INSERT";
+
+       static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
+                       .name("JDBC Connection Pool")
+                       .description("Specifies the JDBC Connection Pool to use 
in order to convert the JSON message to a SQL statement. "
+                                       + "The Connection Pool is necessary in 
order to determine the appropriate database column types.")
+                       .identifiesControllerService(DBCPService.class)
+                       .required(true)
+                       .build();
+       static final PropertyDescriptor STATEMENT_TYPE = new 
PropertyDescriptor.Builder()
+                       .name("Statement Type")
+                       .description("Specifies the type of SQL Statement to 
generate")
+                       .required(true)
+                       .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+                       .build();
+       static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+                       .name("Table Name")
+                       .description("The name of the table that the statement 
should update")
+                       .required(true)
+                       .expressionLanguageSupported(true)
+                       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                       .build();
+       static final PropertyDescriptor CATALOG_NAME = new 
PropertyDescriptor.Builder()
+                       .name("Catalog Name")
+                       .description("The name of the catalog that the 
statement should update. This may not apply for the database that you are 
updating. In this case, leave the field empty")
+                       .required(false)
+                       .expressionLanguageSupported(true)
+                       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                       .build();
+       static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
+                       .name("Translate Field Names")
+                       .description("If true, the Processor will attempt to 
translate JSON field names into the appropriate column names for the table 
specified. "
+                                       + "If false, the JSON field names must 
match the column names exactly, or the column will not be updated")
+                       .allowableValues("true", "false")
+                       .defaultValue("true")
+                       .build();
+       static final PropertyDescriptor UPDATE_KEY = new 
PropertyDescriptor.Builder()
+                       .name("Update Keys")
+                       .description("A comma-separated list of column names 
that uniquely identifies a row in the database for UPDATE statements. "
+                                       + "If the Statement Type is UPDATE and 
this property is not set, the table's Primary Keys are used. "
+                                       + "In this case, if no Primary Key 
exists, the conversion to SQL will fail. "
+                                       + "This property is ignored if the 
Statement Type is INSERT")
+                       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                       .required(false)
+                       .expressionLanguageSupported(true)
+                       .build();
+
+
+       static final Relationship REL_ORIGINAL = new Relationship.Builder()
+                       .name("original")
+                       .description("When a FlowFile is converted to SQL, the 
original JSON FlowFile is routed to this relationship")
+                       .build();
+       static final Relationship REL_SQL = new Relationship.Builder()
+                       .name("sql")
+                       .description("A FlowFile is routed to this relationship 
when its contents have successfully been converted into a SQL statement")
+                       .build();
+       static final Relationship REL_FAILURE = new Relationship.Builder()
+                       .name("failure")
+                       .description("A FlowFile is routed to this relationship 
if it cannot be converted into a SQL statement. Common causes include invalid 
JSON "
+                                       + "content or the JSON content missing 
a required field (if using an INSERT statement type).")
+                       .build();
+
+       private final Map<SchemaKey, TableSchema> schemaCache = new 
LinkedHashMap<SchemaKey, TableSchema>(100) {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               protected boolean 
removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) {
+                       return true;
+               }
+       };
+
+       @Override
+       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+               final List<PropertyDescriptor> properties = new ArrayList<>();
+               properties.add(CONNECTION_POOL);
+               properties.add(STATEMENT_TYPE);
+               properties.add(TABLE_NAME);
+               properties.add(CATALOG_NAME);
+               properties.add(TRANSLATE_FIELD_NAMES);
+               properties.add(UPDATE_KEY);
+               return properties;
+       }
+
+
+       @Override
+       public Set<Relationship> getRelationships() {
+               final Set<Relationship> rels = new HashSet<>();
+               rels.add(REL_ORIGINAL);
+               rels.add(REL_SQL);
+               rels.add(REL_FAILURE);
+               return rels;
+       }
+
+
+       @OnScheduled
+       public void onScheduled(final ProcessContext context) {
+               synchronized (this) {
+                       schemaCache.clear();
+               }
+       }
+
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               FlowFile flowFile = session.get();
+               if (flowFile == null) {
+                       return;
+               }
+
+               final boolean translateFieldNames = 
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+               final String statementType = 
context.getProperty(STATEMENT_TYPE).getValue();
+               final String updateKeys = 
context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+               final String catalog = 
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+               final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+               final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
+               final boolean includePrimaryKeys = 
UPDATE_TYPE.equals(statementType) && updateKeys == null;
+
+               // get the database schema from the cache, if one exists
+               TableSchema schema;
+               synchronized (this) {
+                       schema = schemaCache.get(schemaKey);
+                       if (schema == null) {
+                               // No schema exists for this table yet. Query 
the database to determine the schema and put it into the cache.
+                               final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+                               try (final Connection conn = 
dbcpService.getConnection()) {
+                                       schema = TableSchema.from(conn, 
catalog, tableName, translateFieldNames, includePrimaryKeys);
+                                       schemaCache.put(schemaKey, schema);
+                               } catch (final SQLException e) {
+                                       getLogger().error("Failed to convert {} 
into a SQL statement due to {}; routing to failure", new Object[] {flowFile, 
e.toString()}, e);
+                                       session.transfer(flowFile, REL_FAILURE);
+                                       return;
+                               }
+                       }
+               }
+
+               // Parse the JSON document
+               final ObjectMapper mapper = new ObjectMapper();
+               final ObjectHolder<JsonNode> rootNodeRef = new 
ObjectHolder<>(null);
+               try {
+                       session.read(flowFile, new InputStreamCallback() {
+                               @Override
+                               public void process(final InputStream in) 
throws IOException {
+                                       try (final InputStream bufferedIn = new 
BufferedInputStream(in)) {
+                                               
rootNodeRef.set(mapper.readTree(bufferedIn));
+                                       }
+                               }
+                       });
+               } catch (final ProcessException pe) {
+                       getLogger().error("Failed to parse {} as JSON due to 
{}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
+                       session.transfer(flowFile, REL_FAILURE);
+                       return;
+               }
+
+               final JsonNode rootNode = rootNodeRef.get();
+               final String sql;
+               final Map<String, String> attributes = new HashMap<>();
+               if (INSERT_TYPE.equals(statementType)) {
+                       try {
+                               sql = generateInsert(rootNode, attributes, 
tableName, schema, translateFieldNames);
+                       } catch (final ProcessException pe) {
+                               getLogger().error("Failed to convert {} to a 
SQL INSERT statement due to {}; routing to failure",
+                                               new Object[] { flowFile, 
pe.toString() }, pe);
+                               session.transfer(flowFile, REL_FAILURE);
+                               return;
+                       }
+               } else {
+                       try {
+                               sql = generateUpdate(rootNode, attributes, 
tableName, updateKeys, schema, translateFieldNames);
+                       } catch (final ProcessException pe) {
+                               getLogger().error("Failed to convert {} to a 
SQL UPDATE statement due to {}; routing to failure",
+                                               new Object[] { flowFile, 
pe.toString() }, pe);
+                               session.transfer(flowFile, REL_FAILURE);
+                               return;
+                       }
+               }
+
+               FlowFile sqlFlowFile = session.create(flowFile);
+               sqlFlowFile = session.write(sqlFlowFile, new 
OutputStreamCallback() {
+                       @Override
+                       public void process(final OutputStream out) throws 
IOException {
+                               out.write(sql.getBytes(StandardCharsets.UTF_8));
+                       }
+               });
+
+               attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+               attributes.put("sql.table", tableName);
+               if (catalog != null) {
+                       attributes.put("sql.catalog", catalog);
+               }
+
+               sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
+
+               session.transfer(flowFile, REL_ORIGINAL);
+               session.transfer(sqlFlowFile, REL_SQL);
+       }
+
+       private Set<String> getNormalizedColumnNames(final JsonNode node, final 
boolean translateFieldNames) {
+               final Set<String> normalizedFieldNames = new HashSet<>();
+               final Iterator<String> fieldNameItr = node.getFieldNames();
+               while (fieldNameItr.hasNext()) {
+                       
normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), 
translateFieldNames));
+               }
+
+               return normalizedFieldNames;
+       }
+
+       private String generateInsert(final JsonNode rootNode, final 
Map<String, String> attributes, final String tableName,
+                       final TableSchema schema, final boolean 
translateFieldNames) {
+
+               final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
+               for (final String requiredColName : 
schema.getRequiredColumnNames()) {
+                       final String normalizedColName = 
normalizeColumnName(requiredColName, translateFieldNames);
+                       if (!normalizedFieldNames.contains(normalizedColName)) {
+                               throw new ProcessException("JSON does not have 
a value for the Required column '" + requiredColName + "'");
+                       }
+               }
+
+               final StringBuilder sqlBuilder = new StringBuilder();
+               int fieldCount = 0;
+               sqlBuilder.append("INSERT INTO ").append(tableName).append(" 
(");
+
+               // iterate over all of the elements in the JSON, building the 
SQL statement by adding the column names, as well as
+               // adding the column value to a "sql.args.N.value" attribute 
and the type of a "sql.args.N.type" attribute add the
+               // columns that we are inserting into
+               final Iterator<String> fieldNames = rootNode.getFieldNames();
+               while (fieldNames.hasNext()) {
+                       final String fieldName = fieldNames.next();
+
+                       final ColumnDescription desc = 
schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+                       if (desc != null) {
+                               if (fieldCount++ > 0) {
+                                       sqlBuilder.append(", ");
+                               }
+
+                               sqlBuilder.append(desc.getColumnName());
+
+                               final int sqlType = desc.getDataType();
+                               attributes.put("sql.args." + fieldCount + 
".type", String.valueOf(sqlType));
+
+                               final Integer colSize = desc.getColumnSize();
+                               String fieldValue = 
rootNode.get(fieldName).asText();
+                               if (colSize != null && fieldValue.length() > 
colSize) {
+                                       fieldValue = fieldValue.substring(0, 
colSize);
+                               }
+                               attributes.put("sql.args." + fieldCount + 
".value", fieldValue);
+                       }
+               }
+
+               // complete the SQL statements by adding ?'s for all of the 
values to be escaped.
+               sqlBuilder.append(") VALUES (");
+               for (int i=0; i < fieldCount; i++) {
+                       if (i > 0) {
+                               sqlBuilder.append(", ");
+                       }
+
+                       sqlBuilder.append("?");
+               }
+               sqlBuilder.append(")");
+
+               if (fieldCount == 0) {
+                       throw new ProcessException("None of the fields in the 
JSON map to the columns defined by the " + tableName + " table");
+               }
+
+               return sqlBuilder.toString();
+       }
+
+       private String generateUpdate(final JsonNode rootNode, final 
Map<String, String> attributes, final String tableName, final String updateKeys,
+                       final TableSchema schema, final boolean 
translateFieldNames) {
+
+               final Set<String> updateKeyNames;
+               if (updateKeys == null) {
+                       updateKeyNames = schema.getPrimaryKeyColumnNames();
+               } else {
+                       updateKeyNames = new HashSet<>();
+                       for (final String updateKey : updateKeys.split(",")) {
+                               updateKeyNames.add(updateKey.trim());
+                       }
+               }
+
+               if (updateKeyNames.isEmpty()) {
+                       throw new ProcessException("Table '" + tableName + "' 
does not have a Primary Key and no Update Keys were specified");
+               }
+
+               final StringBuilder sqlBuilder = new StringBuilder();
+               int fieldCount = 0;
+               sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
+
+
+               // Create a Set of all normalized Update Key names, and ensure 
that there is a field in the JSON
+               // for each of the Update Key fields.
+               final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
+               final Set<String> normalizedUpdateNames = new HashSet<>();
+               for (final String uk : updateKeyNames) {
+                       final String normalizedUK = normalizeColumnName(uk, 
translateFieldNames);
+                       normalizedUpdateNames.add(normalizedUK);
+
+                       if (!normalizedFieldNames.contains(normalizedUK)) {
+                               throw new ProcessException("JSON does not have 
a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column 
'" + uk + "'");
+                       }
+               }
+
+               // iterate over all of the elements in the JSON, building the 
SQL statement by adding the column names, as well as
+               // adding the column value to a "sql.args.N.value" attribute 
and the type of a "sql.args.N.type" attribute add the
+               // columns that we are inserting into
+               Iterator<String> fieldNames = rootNode.getFieldNames();
+               while (fieldNames.hasNext()) {
+                       final String fieldName = fieldNames.next();
+
+                       final String normalizedColName = 
normalizeColumnName(fieldName, translateFieldNames);
+                       final ColumnDescription desc = 
schema.getColumns().get(normalizedColName);
+                       if (desc == null) {
+                               continue;
+                       }
+
+                       // Check if this column is an Update Key. If so, skip 
it for now. We will come
+                       // back to it after we finish the SET clause
+                       if (normalizedUpdateNames.contains(normalizedColName)) {
+                               continue;
+                       }
+
+                       if (fieldCount++ > 0) {
+                               sqlBuilder.append(", ");
+                       }
+
+                       sqlBuilder.append(desc.getColumnName()).append(" = ?");
+                       final int sqlType = desc.getDataType();
+                       attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+
+                       final Integer colSize = desc.getColumnSize();
+                       String fieldValue = rootNode.get(fieldName).asText();
+                       if (colSize != null && fieldValue.length() > colSize) {
+                               fieldValue = fieldValue.substring(0, colSize);
+                       }
+                       attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
+               }
+
+               // Set the WHERE clause based on the Update Key values
+               sqlBuilder.append(" WHERE ");
+
+               fieldNames = rootNode.getFieldNames();
+               int whereFieldCount = 0;
+               while (fieldNames.hasNext()) {
+                       final String fieldName = fieldNames.next();
+
+                       final String normalizedColName = 
normalizeColumnName(fieldName, translateFieldNames);
+                       final ColumnDescription desc = 
schema.getColumns().get(normalizedColName);
+                       if (desc == null) {
+                               continue;
+                       }
+
+                       // Check if this column is a Update Key. If so, skip it 
for now. We will come
+                       // back to it after we finish the SET clause
+                       if (!normalizedUpdateNames.contains(normalizedColName)) 
{
+                               continue;
+                       }
+
+                       if (whereFieldCount++ > 0) {
+                               sqlBuilder.append(" AND ");
+                       }
+                       fieldCount++;
+
+                       sqlBuilder.append(normalizedColName).append(" = ?");
+                       final int sqlType = desc.getDataType();
+                       attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+
+                       final Integer colSize = desc.getColumnSize();
+                       String fieldValue = rootNode.get(fieldName).asText();
+                       if (colSize != null && fieldValue.length() > colSize) {
+                               fieldValue = fieldValue.substring(0, colSize);
+                       }
+                       attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
+               }
+
+               return sqlBuilder.toString();
+       }
+
+       private static String normalizeColumnName(final String colName, final 
boolean translateColumnNames) {
+               return translateColumnNames ? 
colName.toUpperCase().replace("_", "") : colName;
+       }
+
+       private static class TableSchema {
+               private List<String> requiredColumnNames;
+               private Set<String> primaryKeyColumnNames;
+               private Map<String, ColumnDescription> columns;
+
+               private TableSchema(final List<ColumnDescription> 
columnDescriptions, final boolean translateColumnNames,
+                               final Set<String> primaryKeyColumnNames) {
+                       this.columns = new HashMap<>();
+                       this.primaryKeyColumnNames = primaryKeyColumnNames;
+
+                       this.requiredColumnNames = new ArrayList<>();
+                       for (final ColumnDescription desc : columnDescriptions) 
{
+                               
columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, 
translateColumnNames), desc);
+                               if (desc.isRequired()) {
+                                       
requiredColumnNames.add(desc.columnName);
+                               }
+                       }
+               }
+
+               public Map<String, ColumnDescription> getColumns() {
+                       return columns;
+               }
+
+               public List<String> getRequiredColumnNames() {
+                       return requiredColumnNames;
+               }
+
+               public Set<String> getPrimaryKeyColumnNames() {
+                       return primaryKeyColumnNames;
+               }
+
+               public static TableSchema from(final Connection conn, final 
String catalog, final String tableName,
+                               final boolean translateColumnNames, final 
boolean includePrimaryKeys) throws SQLException {
+                       final ResultSet colrs = 
conn.getMetaData().getColumns(catalog, null, tableName, "%");
+
+                       final List<ColumnDescription> cols = new ArrayList<>();
+                       while (colrs.next()) {
+                               final ColumnDescription col = 
ColumnDescription.from(colrs);
+                               cols.add(col);
+                       }
+
+                       final Set<String> primaryKeyColumns = new HashSet<>();
+                       if (includePrimaryKeys) {
+                               final ResultSet pkrs = 
conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
+
+                               while (pkrs.next()) {
+                                       final String colName = 
pkrs.getString("COLUMN_NAME");
+                                       
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
+                               }
+                       }
+
+                       return new TableSchema(cols, translateColumnNames, 
primaryKeyColumns);
+               }
+       }
+
+       private static class ColumnDescription {
+               private final String columnName;
+               private final int dataType;
+               private final boolean required;
+               private final Integer columnSize;
+
+               private ColumnDescription(final String columnName, final int 
dataType, final boolean required, final Integer columnSize) {
+                       this.columnName = columnName;
+                       this.dataType = dataType;
+                       this.required = required;
+                       this.columnSize = columnSize;
+               }
+
+               public int getDataType() {
+                       return dataType;
+               }
+
+               public Integer getColumnSize() {
+                       return columnSize;
+               }
+
+               public String getColumnName() {
+                       return columnName;
+               }
+
+               public boolean isRequired() {
+                       return required;
+               }
+
+               public static ColumnDescription from(final ResultSet resultSet) 
throws SQLException {
+                       final String columnName = 
resultSet.getString("COLUMN_NAME");
+                       final int dataType = resultSet.getInt("DATA_TYPE");
+                       final int colSize = resultSet.getInt("COLUMN_SIZE");
+
+                       final String nullableValue = 
resultSet.getString("IS_NULLABLE");
+                       final boolean isNullable = 
"YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
+                       final String defaultValue = 
resultSet.getString("COLUMN_DEF");
+                       final String autoIncrementValue = 
resultSet.getString("IS_AUTOINCREMENT");
+                       final boolean isAutoIncrement = 
"YES".equalsIgnoreCase(autoIncrementValue);
+                       final boolean required = !isNullable && 
!isAutoIncrement && defaultValue == null;
+
+                       return new ColumnDescription(columnName, dataType, 
required, colSize == 0 ? null : colSize);
+               }
+       }
+
+       private static class SchemaKey {
+               private final String catalog;
+               private final String tableName;
+
+               public SchemaKey(final String catalog, final String tableName) {
+                       this.catalog = catalog;
+                       this.tableName = tableName;
+               }
+
+               @Override
+               public int hashCode() {
+                       final int prime = 31;
+                       int result = 1;
+                       result = prime * result + ((catalog == null) ? 0 : 
catalog.hashCode());
+                       result = prime * result + ((tableName == null) ? 0 : 
tableName.hashCode());
+                       return result;
+               }
+
+               @Override
+               public boolean equals(final Object obj) {
+                       if (this == obj) {
+                               return true;
+                       }
+                       if (obj == null) {
+                               return false;
+                       }
+                       if (getClass() != obj.getClass()) {
+                               return false;
+                       }
+
+                       final SchemaKey other = (SchemaKey) obj;
+                       if (catalog == null) {
+                               if (other.catalog != null) {
+                                       return false;
+                               }
+                       } else if (!catalog.equals(other.catalog)) {
+                               return false;
+                       }
+
+
+                       if (tableName == null) {
+                               if (other.tableName != null) {
+                                       return false;
+                               }
+                       } else if (!tableName.equals(other.tableName)) {
+                               return false;
+                       }
+
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
new file mode 100644
index 0000000..d349a58
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+
+@SupportsBatching
+@SeeAlso(ConvertFlatJSONToSQL.class)
+@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
+@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content 
of an incoming FlowFile is expected to be the SQL command "
+               + "to execute. The SQL command may use the ? to escape 
parameters. In this case, the parameters to use must exist as FlowFile 
attributes "
+               + "with the naming convention sql.args.N.type and 
sql.args.N.value, where N is a positive integer. The sql.args.N.type is 
expected to be "
+               + "a number indicating the JDBC Type. The content of the 
FlowFile is expected to be in UTF-8 format.")
+public class PutSQL extends AbstractProcessor {
+
+       static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
+                       .name("JDBC Connection Pool")
+                       .description("Specifies the JDBC Connection Pool to use 
in order to convert the JSON message to a SQL statement. "
+                                       + "The Connection Pool is necessary in 
order to determine the appropriate database column types.")
+                       .identifiesControllerService(DBCPService.class)
+                       .required(true)
+                       .build();
+
+
+       static final Relationship REL_SUCCESS = new Relationship.Builder()
+                       .name("success")
+                       .description("A FlowFile is routed to this relationship 
after the database is successfully updated")
+                       .build();
+       static final Relationship REL_FAILURE = new Relationship.Builder()
+                       .name("failure")
+                       .description("A FlowFile is routed to this relationship 
if the database cannot be updated for any reason")
+                       .build();
+
+       private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("sql\\.args\\.(\\d+)\\.type");
+       private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
+       @Override
+       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+               final List<PropertyDescriptor> properties = new ArrayList<>();
+               properties.add(CONNECTION_POOL);
+               return properties;
+       }
+
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               FlowFile flowFile = session.get();
+               if (flowFile == null) {
+                       return;
+               }
+
+               // TODO: Batch. Pull in 50 or 100 at a time and map content of 
FlowFile to Set<FlowFile> that have that
+               // same content. Then execute updates in batches.
+
+               final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+               try (final Connection conn = dbcpService.getConnection()) {
+                       // Read the SQL from the FlowFile's content
+                       final byte[] buffer = new byte[(int) 
flowFile.getSize()];
+                       session.read(flowFile, new InputStreamCallback() {
+                               @Override
+                               public void process(final InputStream in) 
throws IOException {
+                                       StreamUtils.fillBuffer(in, buffer);
+                               }
+                       });
+
+                       final String sql = new String(buffer, 
StandardCharsets.UTF_8);
+
+                       // Create a prepared statement and set the appropriate 
parameters on the statement.
+                       try (final PreparedStatement stmt = 
conn.prepareStatement(sql)) {
+                               for (final Map.Entry<String, String> entry : 
flowFile.getAttributes().entrySet()) {
+                                       final String key = entry.getKey();
+                                       final Matcher matcher = 
SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+                                       if (matcher.matches()) {
+                                               final int parameterIndex = 
Integer.parseInt(matcher.group(1));
+
+                                               final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
+                                               if (!isNumeric) {
+                                                       
getLogger().error("Cannot update database for {} because the value of the '{}' 
attribute is '{}', which is not a valid JDBC numeral type; routing to failure",
+                                                                       new 
Object[] {flowFile, key, entry.getValue()});
+                                                       
session.transfer(flowFile, REL_FAILURE);
+                                                       return;
+                                               }
+
+                                               final int jdbcType = 
Integer.parseInt(entry.getValue());
+                                               final String valueAttrName = 
"sql.args." + parameterIndex + ".value";
+                                               final String parameterValue = 
flowFile.getAttribute(valueAttrName);
+                                               if (parameterValue == null) {
+                                                       
getLogger().error("Cannot update database for {} because the '{}' attribute 
exists but the '{}' attribute does not", new Object[] {flowFile, key, 
valueAttrName});
+                                                       
session.transfer(flowFile, REL_FAILURE);
+                                                       return;
+                                               }
+
+                                               try {
+                                                       setParameter(stmt, 
valueAttrName, parameterIndex, parameterValue, jdbcType);
+                                               } catch (final 
NumberFormatException nfe) {
+                                                       
getLogger().error("Cannot update database for {} because the '{}' attribute has 
a value of '{}', "
+                                                                       + 
"which cannot be converted into the necessary data type; routing to failure", 
new Object[] {flowFile, valueAttrName, parameterValue});
+                                                       
session.transfer(flowFile, REL_FAILURE);
+                                                       return;
+                                               }
+                                       }
+                               }
+
+                               final int updatedRowCount = 
stmt.executeUpdate();
+                               flowFile = session.putAttribute(flowFile, 
"sql.update.count", String.valueOf(updatedRowCount));
+                       }
+
+                       // TODO: Need to expose Connection URL from DBCP 
Service and use it to emit a Provenance Event
+                       session.transfer(flowFile, REL_SUCCESS);
+               } catch (final SQLException e) {
+                       getLogger().error("Failed to update database for {} due 
to {}; routing to failure", new Object[] {flowFile, e});
+                       session.transfer(flowFile, REL_FAILURE);
+                       return;
+               }
+       }
+
+       private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException {
+               switch (jdbcType) {
+                       case Types.BIT:
+                               stmt.setBoolean(parameterIndex, 
Boolean.parseBoolean(parameterValue));
+                               break;
+                       case Types.TINYINT:
+                               stmt.setByte(parameterIndex, 
Byte.parseByte(parameterValue));
+                               break;
+                       case Types.SMALLINT:
+                               stmt.setShort(parameterIndex, 
Short.parseShort(parameterValue));
+                               break;
+                       case Types.INTEGER:
+                               stmt.setInt(parameterIndex, 
Integer.parseInt(parameterValue));
+                               break;
+                       case Types.BIGINT:
+                               stmt.setLong(parameterIndex, 
Long.parseLong(parameterValue));
+                               break;
+                       case Types.REAL:
+                               stmt.setFloat(parameterIndex, 
Float.parseFloat(parameterValue));
+                               break;
+                       case Types.FLOAT:
+                       case Types.DOUBLE:
+                               stmt.setDouble(parameterIndex, 
Double.parseDouble(parameterValue));
+                               break;
+                       case Types.DATE:
+                               stmt.setDate(parameterIndex, new 
Date(Long.parseLong(parameterValue)));
+                               break;
+                       case Types.TIME:
+                               stmt.setTime(parameterIndex, new 
Time(Long.parseLong(parameterValue)));
+                               break;
+                       case Types.TIMESTAMP:
+                               stmt.setTimestamp(parameterIndex, new 
Timestamp(Long.parseLong(parameterValue)));
+                               break;
+                       case Types.CHAR:
+                       case Types.VARCHAR:
+                       case Types.LONGNVARCHAR:
+                       case Types.LONGVARCHAR:
+                               stmt.setString(parameterIndex, parameterValue);
+                               break;
+                       default:
+                               throw new SQLException("The '" + attrName + "' 
attribute has a value of '" + parameterValue
+                                               + "' and a type of '" + 
jdbcType + "' but this is not a known data type");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
new file mode 100644
index 0000000..b2bdbba
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestConvertFlatJSONToSQL {
+    static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() {
+               System.setProperty("derby.stream.error.file", 
"target/derby.log");
+       }
+
+       @Test
+       public void testInsert() throws InitializationException, 
ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
+               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"INSERT");
+               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+               runner.run();
+
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.1.value", "1");
+               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+               out.assertAttributeEquals("sql.args.2.value", "Mark");
+               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.3.value", "48");
+
+               out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) 
VALUES (?, ?, ?)");
+       }
+
+
+       @Test
+       public void testUpdateBasedOnPrimaryKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
+               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
+               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+               runner.run();
+
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
+               out.assertAttributeEquals("sql.args.1.value", "Mark");
+               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.2.value", "48");
+               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.3.value", "1");
+
+               out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? 
WHERE ID = ?");
+       }
+
+
+       @Test
+       public void testUpdateBasedOnUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
+               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
+               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code");
+               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+               runner.run();
+
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.1.value", "1");
+               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+               out.assertAttributeEquals("sql.args.2.value", "Mark");
+               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.3.value", "48");
+
+               out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? 
WHERE CODE = ?");
+       }
+
+       @Test
+       public void testUpdateBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
+               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
+               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
+               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
+               runner.run();
+
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
+               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
+               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
+               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.1.value", "1");
+               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+               out.assertAttributeEquals("sql.args.2.value", "Mark");
+               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+               out.assertAttributeEquals("sql.args.3.value", "48");
+
+               out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = 
? AND CODE = ?");
+       }
+
+       @Test
+       public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
+               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
+               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
+               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json"));
+               runner.run();
+
+               
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
+       }
+
+       @Test
+       public void testUpdateWithMalformedJson() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
+               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
+               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
+               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json"));
+               runner.run();
+
+               
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
+       }
+
+       @Test
+       public void testInsertWithMissingField() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
+               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
+               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"INSERT");
+               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
+               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json"));
+               runner.run();
+
+               
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
+       }
+
+
+       /**
+        * Simple implementation only for testing purposes
+        */
+       private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
+               private final String dbLocation;
+
+               public MockDBCPService(final String dbLocation) {
+                       this.dbLocation = dbLocation;
+               }
+
+               @Override
+               public String getIdentifier() {
+                       return "dbcp";
+               }
+
+               @Override
+               public Connection getConnection() throws ProcessException {
+                       try {
+                               
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                               final Connection con = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
+                               return con;
+                       } catch (final Exception e) {
+                               e.printStackTrace();
+                               throw new ProcessException("getConnection 
failed: " + e);
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
new file mode 100644
index 0000000..ccba3e1
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestPutSQL {
+    static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() {
+               System.setProperty("derby.stream.error.file", 
"target/derby.log");
+       }
+
+       @Test
+       public void testDirectStatements() throws InitializationException, 
ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(PutSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+               runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 
'Mark', 84)".getBytes());
+               runner.run();
+
+               runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
+                               assertTrue(rs.next());
+                               assertEquals(1, rs.getInt(1));
+                               assertEquals("Mark", rs.getString(2));
+                               assertEquals(84, rs.getInt(3));
+                               assertFalse(rs.next());
+                       }
+               }
+
+               runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE 
ID=1".getBytes());
+               runner.run();
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
+                               assertTrue(rs.next());
+                               assertEquals(1, rs.getInt(1));
+                               assertEquals("George", rs.getString(2));
+                               assertEquals(84, rs.getInt(3));
+                               assertFalse(rs.next());
+                       }
+               }
+       }
+
+
+       @Test
+       public void testStatementsWithPreparedParameters() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(PutSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+               final Map<String, String> attributes = new HashMap<>();
+               attributes.put("sql.args.1.type", 
String.valueOf(Types.INTEGER));
+               attributes.put("sql.args.1.value", "1");
+
+               attributes.put("sql.args.2.type", 
String.valueOf(Types.VARCHAR));
+               attributes.put("sql.args.2.value", "Mark");
+
+               attributes.put("sql.args.3.type", 
String.valueOf(Types.INTEGER));
+               attributes.put("sql.args.3.value", "84");
+
+               runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, 
?, ?)".getBytes(), attributes);
+               runner.run();
+
+               runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
+                               assertTrue(rs.next());
+                               assertEquals(1, rs.getInt(1));
+                               assertEquals("Mark", rs.getString(2));
+                               assertEquals(84, rs.getInt(3));
+                               assertFalse(rs.next());
+                       }
+               }
+
+               runner.clearTransferState();
+
+               attributes.clear();
+               attributes.put("sql.args.1.type", 
String.valueOf(Types.VARCHAR));
+               attributes.put("sql.args.1.value", "George");
+
+               attributes.put("sql.args.2.type", 
String.valueOf(Types.INTEGER));
+               attributes.put("sql.args.2.value", "1");
+
+               runner.enqueue("UPDATE PERSONS SET NAME=? WHERE 
ID=?".getBytes(), attributes);
+               runner.run();
+               runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
+                               assertTrue(rs.next());
+                               assertEquals(1, rs.getInt(1));
+                               assertEquals("George", rs.getString(2));
+                               assertEquals(84, rs.getInt(3));
+                               assertFalse(rs.next());
+                       }
+               }
+       }
+
+
+       @Test
+       public void testMultipleStatementsWithinFlowFile() throws 
InitializationException, ProcessException, SQLException, IOException {
+               final TestRunner runner = 
TestRunners.newTestRunner(PutSQL.class);
+               final File tempDir = folder.getRoot();
+               final File dbDir = new File(tempDir, "db");
+               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+               runner.addControllerService("dbcp", service);
+               runner.enableControllerService(service);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               stmt.executeUpdate(createPersons);
+                       }
+               }
+
+               runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+               final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?); " +
+                               "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+               final Map<String, String> attributes = new HashMap<>();
+               attributes.put("sql.args.1.type", 
String.valueOf(Types.INTEGER));
+               attributes.put("sql.args.1.value", "1");
+
+               attributes.put("sql.args.2.type", 
String.valueOf(Types.VARCHAR));
+               attributes.put("sql.args.2.value", "Mark");
+
+               attributes.put("sql.args.3.type", 
String.valueOf(Types.INTEGER));
+               attributes.put("sql.args.3.value", "84");
+
+               attributes.put("sql.args.4.type", 
String.valueOf(Types.INTEGER));
+               attributes.put("sql.args.4.value", "1");
+
+               runner.enqueue(sql.getBytes(), attributes);
+               runner.run();
+
+               // should fail because of the semicolon
+               runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
+
+               try (final Connection conn = service.getConnection()) {
+                       try (final Statement stmt = conn.createStatement()) {
+                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
+                               assertFalse(rs.next());
+                       }
+               }
+       }
+
+
+       /**
+        * Simple implementation only for testing purposes
+        */
+       private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
+               private final String dbLocation;
+
+               public MockDBCPService(final String dbLocation) {
+                       this.dbLocation = dbLocation;
+               }
+
+               @Override
+               public String getIdentifier() {
+                       return "dbcp";
+               }
+
+               @Override
+               public Connection getConnection() throws ProcessException {
+                       try {
+                               
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                               final Connection con = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
+                               return con;
+                       } catch (final Exception e) {
+                               e.printStackTrace();
+                               throw new ProcessException("getConnection 
failed: " + e);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
new file mode 100644
index 0000000..2f82a94
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
@@ -0,0 +1,4 @@
+{
+       "id": 1,
+       "name": "Mark",
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
new file mode 100644
index 0000000..8f98447
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
@@ -0,0 +1,5 @@
+{
+       "id": 1,
+       "name": "Mark",
+       "code": 48
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
new file mode 100644
index 0000000..3ff074a
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
@@ -0,0 +1,4 @@
+{
+       "id": 1,
+       "name": "Mark"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a37c95f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
new file mode 100644
index 0000000..347cc9e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
@@ -0,0 +1,4 @@
+{
+       "name": "Mark",
+       "code": 48
+}
\ No newline at end of file

Reply via email to