http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
new file mode 100644
index 0000000..63073e6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestConvertJSONToSQL {
+    static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    @Test
+    public void testInsert() throws InitializationException, ProcessException, 
SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+    }
+
+    @Test
+    public void testInsertWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeNotExists("sql.args.3.value");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+    }
+
+
+    @Test
+    public void testUpdateWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.1.value", "Mark");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeNotExists("sql.args.2.value");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "1");
+
+        out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE 
ID = ?");
+    }
+
+
+    @Test
+    public void testMultipleInserts() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
+        for (final MockFlowFile mff : mffs) {
+            mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) 
VALUES (?, ?, ?)");
+
+            for (int i=1; i <= 3; i++) {
+                mff.assertAttributeExists("sql.args." + i + ".type");
+                mff.assertAttributeExists("sql.args." + i + ".value");
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateBasedOnPrimaryKey() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.1.value", "Mark");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.2.value", "48");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "1");
+
+        out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE 
ID = ?");
+    }
+
+    @Test
+    public void testUnmappedFieldBehavior() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, 
ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue());
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+
+        runner.clearTransferState();
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, 
ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue());
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testUpdateBasedOnUpdateKey() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE 
CODE = ?");
+    }
+
+    @Test
+    public void testUpdateBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND 
CODE = ?");
+    }
+
+    @Test
+    public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testUpdateWithMalformedJson() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testInsertWithMissingField() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+
+    /**
+     * Simple implementation only for testing purposes
+     */
+    private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
+        private final String dbLocation;
+
+        public MockDBCPService(final String dbLocation) {
+            this.dbLocation = dbLocation;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                final Connection con = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
+                return con;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
new file mode 100644
index 0000000..a348c9e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+public class TestPutSQL {
+    private static final String createPersons = "CREATE TABLE PERSONS (id 
integer primary key, name varchar(100), code integer)";
+    private static final String createPersonsAutoId = "CREATE TABLE PERSONS 
(id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name 
VARCHAR(100), code INTEGER check(code <= 100))";
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    @Test
+    public void testDirectStatements() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 
'Mark', 84)".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+
+        runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE 
ID=1".getBytes());
+        runner.run();
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("George", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testInsertWithGeneratedKeys() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
84)".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("sql.generated.key", "1");
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testFailInMiddleWithBadStatement() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
84)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS".getBytes());  // intentionally 
wrong syntax
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 
3)".getBytes());
+        runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 
44)".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
+        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
+    }
+
+
+    @Test
+    public void testFailInMiddleWithBadParameterType() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        goodAttributes.put("sql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
+        badAttributes.put("sql.args.1.value", "hello");
+
+        final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.run();
+
+        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
+        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
+    }
+
+
+    @Test
+    public void testFailInMiddleWithBadParameterValue() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersonsAutoId);
+            }
+        }
+
+        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        goodAttributes.put("sql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        badAttributes.put("sql.args.1.value", "9999");
+
+        final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.run();
+
+        runner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
+        runner.assertTransferCount(PutSQL.REL_RETRY, 2);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+
+    @Test
+    public void testStatementsWithPreparedParameters() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+
+        runner.clearTransferState();
+
+        attributes.clear();
+        attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.1.value", "George");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.2.value", "1");
+
+        runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), 
attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("George", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testMultipleStatementsWithinFlowFile() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because of the semicolon
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testWithNullParameter() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(0, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testInvalidStatement() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because of the semicolon
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testRetryableFailure() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final DBCPService service = new SQLExceptionService(null);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.4.value", "1");
+
+        runner.enqueue(sql.getBytes(), attributes);
+        runner.run();
+
+        // should fail because of the semicolon
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
+    }
+
+
+    @Test
+    public void testMultipleFlowFilesSuccessfulInTransaction() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(PutSQL.BATCH_SIZE, "1");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+        attributes.put("sql.args.2.value", "Mark");
+
+        attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.3.value", "84");
+
+        attributes.put("fragment.identifier", "1");
+        attributes.put("fragment.count", "2");
+        attributes.put("fragment.index", "0");
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?)".getBytes(), attributes);
+        runner.run();
+
+        // No FlowFiles should be transferred because there were not enough 
flowfiles with the same fragment identifier
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
+
+        attributes.clear();
+        attributes.put("fragment.identifier", "1");
+        attributes.put("fragment.count", "2");
+        attributes.put("fragment.index", "1");
+
+        runner.clearTransferState();
+        runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE 
ID=1".getBytes(), attributes);
+        runner.run();
+
+        // Both FlowFiles with fragment identifier 1 should be successful
+        runner.assertTransferCount(PutSQL.REL_SUCCESS, 2);
+        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+        runner.assertTransferCount(PutSQL.REL_RETRY, 0);
+        for (final MockFlowFile mff : 
runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) {
+            mff.assertAttributeEquals("fragment.identifier", "1");
+        }
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Leonard", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+    @Test
+    public void testTransactionTimeout() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("fragment.identifier", "1");
+        attributes.put("fragment.count", "2");
+        attributes.put("fragment.index", "0");
+
+        final MockFlowFile mff = new MockFlowFile(0L) {
+            @Override
+            public Long getLastQueueDate() {
+                return System.currentTimeMillis() - 10000L;   // return 10 
seconds ago
+            }
+
+            @Override
+            public Map<String, String> getAttributes() {
+                return attributes;
+            }
+
+            @Override
+            public String getAttribute(final String attrName) {
+                return attributes.get(attrName);
+            }
+        };
+
+        runner.enqueue(mff);
+        runner.run();
+
+        // No FlowFiles should be transferred because there were not enough 
flowfiles with the same fragment identifier
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
+    }
+
+    /**
+     * Simple implementation only for testing purposes
+     */
+    private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
+        private final String dbLocation;
+
+        public MockDBCPService(final String dbLocation) {
+            this.dbLocation = dbLocation;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                final Connection conn = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
+                return conn;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+
+    /**
+     * Simple implementation only for testing purposes
+     */
+    private static class SQLExceptionService extends AbstractControllerService 
implements DBCPService {
+        private final DBCPService service;
+        private int allowedBeforeFailure = 0;
+        private int successful = 0;
+
+        public SQLExceptionService(final DBCPService service) {
+            this.service = service;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                if (++successful > allowedBeforeFailure) {
+                    final Connection conn = Mockito.mock(Connection.class);
+                    
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new 
SQLException("Unit Test Generated SQLException"));
+                    return conn;
+                } else {
+                    return service.getConnection();
+                }
+            } catch (final Exception e) {
+                e.printStackTrace();
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json
new file mode 100644
index 0000000..2f82a94
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json
@@ -0,0 +1,4 @@
+{
+       "id": 1,
+       "name": "Mark",
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json
new file mode 100644
index 0000000..8f98447
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json
@@ -0,0 +1,5 @@
+{
+       "id": 1,
+       "name": "Mark",
+       "code": 48
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json
new file mode 100644
index 0000000..3ebd587
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json
@@ -0,0 +1,6 @@
+{
+       "id": 1,
+       "name": "Mark",
+       "code": 48,
+       "extra": "another"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json
new file mode 100644
index 0000000..0f491ce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json
@@ -0,0 +1,5 @@
+{
+       "id": 1,
+       "name": "Mark",
+       "code": null
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json
new file mode 100644
index 0000000..3ff074a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json
@@ -0,0 +1,4 @@
+{
+       "id": 1,
+       "name": "Mark"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json
new file mode 100644
index 0000000..347cc9e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json
@@ -0,0 +1,4 @@
+{
+       "name": "Mark",
+       "code": 48
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json
new file mode 100644
index 0000000..573b698
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json
@@ -0,0 +1,21 @@
+[{
+       "id": 1,
+       "name": "Mark",
+       "code": 48
+}, {
+       "id": 2,
+       "name": "George",
+       "code": 48
+}, {
+       "id": 3,
+       "name": "Harry",
+       "code": 21
+}, {
+       "id": 4,
+       "name": "Julie",
+       "code": 48
+}, {
+       "id": 82,
+       "name": "Frank Henry",
+       "code": 16
+}]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
deleted file mode 100644
index 57d855f..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.ObjectHolder;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-
-
-@SideEffectFree
-@SupportsBatching
-@SeeAlso(PutSQL.class)
-@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", 
"flat"})
-@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or 
INSERT SQL statement. The incoming FlowFile is expected to be "
-               + "\"flat\" JSON message, meaning that it consists of a single 
JSON element and each field maps to a simple type. If a field maps to "
-               + "a JSON object, that JSON object will be interpreted as Text. 
Upon successful conversion, the original FlowFile is routed to the 'original' "
-               + "relationship and the SQL is routed to the 'sql' 
relationship.")
-public class ConvertFlatJSONToSQL extends AbstractProcessor {
-       private static final String UPDATE_TYPE = "UPDATE";
-       private static final String INSERT_TYPE = "INSERT";
-
-       static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
-                       .name("JDBC Connection Pool")
-                       .description("Specifies the JDBC Connection Pool to use 
in order to convert the JSON message to a SQL statement. "
-                                       + "The Connection Pool is necessary in 
order to determine the appropriate database column types.")
-                       .identifiesControllerService(DBCPService.class)
-                       .required(true)
-                       .build();
-       static final PropertyDescriptor STATEMENT_TYPE = new 
PropertyDescriptor.Builder()
-                       .name("Statement Type")
-                       .description("Specifies the type of SQL Statement to 
generate")
-                       .required(true)
-                       .allowableValues(UPDATE_TYPE, INSERT_TYPE)
-                       .build();
-       static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
-                       .name("Table Name")
-                       .description("The name of the table that the statement 
should update")
-                       .required(true)
-                       .expressionLanguageSupported(true)
-                       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                       .build();
-       static final PropertyDescriptor CATALOG_NAME = new 
PropertyDescriptor.Builder()
-                       .name("Catalog Name")
-                       .description("The name of the catalog that the 
statement should update. This may not apply for the database that you are 
updating. In this case, leave the field empty")
-                       .required(false)
-                       .expressionLanguageSupported(true)
-                       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                       .build();
-       static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
-                       .name("Translate Field Names")
-                       .description("If true, the Processor will attempt to 
translate JSON field names into the appropriate column names for the table 
specified. "
-                                       + "If false, the JSON field names must 
match the column names exactly, or the column will not be updated")
-                       .allowableValues("true", "false")
-                       .defaultValue("true")
-                       .build();
-       static final PropertyDescriptor UPDATE_KEY = new 
PropertyDescriptor.Builder()
-                       .name("Update Keys")
-                       .description("A comma-separated list of column names 
that uniquely identifies a row in the database for UPDATE statements. "
-                                       + "If the Statement Type is UPDATE and 
this property is not set, the table's Primary Keys are used. "
-                                       + "In this case, if no Primary Key 
exists, the conversion to SQL will fail. "
-                                       + "This property is ignored if the 
Statement Type is INSERT")
-                       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                       .required(false)
-                       .expressionLanguageSupported(true)
-                       .build();
-
-
-       static final Relationship REL_ORIGINAL = new Relationship.Builder()
-                       .name("original")
-                       .description("When a FlowFile is converted to SQL, the 
original JSON FlowFile is routed to this relationship")
-                       .build();
-       static final Relationship REL_SQL = new Relationship.Builder()
-                       .name("sql")
-                       .description("A FlowFile is routed to this relationship 
when its contents have successfully been converted into a SQL statement")
-                       .build();
-       static final Relationship REL_FAILURE = new Relationship.Builder()
-                       .name("failure")
-                       .description("A FlowFile is routed to this relationship 
if it cannot be converted into a SQL statement. Common causes include invalid 
JSON "
-                                       + "content or the JSON content missing 
a required field (if using an INSERT statement type).")
-                       .build();
-
-       private final Map<SchemaKey, TableSchema> schemaCache = new 
LinkedHashMap<SchemaKey, TableSchema>(100) {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               protected boolean 
removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) {
-                       return true;
-               }
-       };
-
-       @Override
-       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-               final List<PropertyDescriptor> properties = new ArrayList<>();
-               properties.add(CONNECTION_POOL);
-               properties.add(STATEMENT_TYPE);
-               properties.add(TABLE_NAME);
-               properties.add(CATALOG_NAME);
-               properties.add(TRANSLATE_FIELD_NAMES);
-               properties.add(UPDATE_KEY);
-               return properties;
-       }
-
-
-       @Override
-       public Set<Relationship> getRelationships() {
-               final Set<Relationship> rels = new HashSet<>();
-               rels.add(REL_ORIGINAL);
-               rels.add(REL_SQL);
-               rels.add(REL_FAILURE);
-               return rels;
-       }
-
-
-       @OnScheduled
-       public void onScheduled(final ProcessContext context) {
-               synchronized (this) {
-                       schemaCache.clear();
-               }
-       }
-
-       @Override
-       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
-               FlowFile flowFile = session.get();
-               if (flowFile == null) {
-                       return;
-               }
-
-               final boolean translateFieldNames = 
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
-               final String statementType = 
context.getProperty(STATEMENT_TYPE).getValue();
-               final String updateKeys = 
context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
-
-               final String catalog = 
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
-               final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-               final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
-               final boolean includePrimaryKeys = 
UPDATE_TYPE.equals(statementType) && updateKeys == null;
-
-               // get the database schema from the cache, if one exists
-               TableSchema schema;
-               synchronized (this) {
-                       schema = schemaCache.get(schemaKey);
-                       if (schema == null) {
-                               // No schema exists for this table yet. Query 
the database to determine the schema and put it into the cache.
-                               final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
-                               try (final Connection conn = 
dbcpService.getConnection()) {
-                                       schema = TableSchema.from(conn, 
catalog, tableName, translateFieldNames, includePrimaryKeys);
-                                       schemaCache.put(schemaKey, schema);
-                               } catch (final SQLException e) {
-                                       getLogger().error("Failed to convert {} 
into a SQL statement due to {}; routing to failure", new Object[] {flowFile, 
e.toString()}, e);
-                                       session.transfer(flowFile, REL_FAILURE);
-                                       return;
-                               }
-                       }
-               }
-
-               // Parse the JSON document
-               final ObjectMapper mapper = new ObjectMapper();
-               final ObjectHolder<JsonNode> rootNodeRef = new 
ObjectHolder<>(null);
-               try {
-                       session.read(flowFile, new InputStreamCallback() {
-                               @Override
-                               public void process(final InputStream in) 
throws IOException {
-                                       try (final InputStream bufferedIn = new 
BufferedInputStream(in)) {
-                                               
rootNodeRef.set(mapper.readTree(bufferedIn));
-                                       }
-                               }
-                       });
-               } catch (final ProcessException pe) {
-                       getLogger().error("Failed to parse {} as JSON due to 
{}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
-                       session.transfer(flowFile, REL_FAILURE);
-                       return;
-               }
-
-               final JsonNode rootNode = rootNodeRef.get();
-               final String sql;
-               final Map<String, String> attributes = new HashMap<>();
-               if (INSERT_TYPE.equals(statementType)) {
-                       try {
-                               sql = generateInsert(rootNode, attributes, 
tableName, schema, translateFieldNames);
-                       } catch (final ProcessException pe) {
-                               getLogger().error("Failed to convert {} to a 
SQL INSERT statement due to {}; routing to failure",
-                                               new Object[] { flowFile, 
pe.toString() }, pe);
-                               session.transfer(flowFile, REL_FAILURE);
-                               return;
-                       }
-               } else {
-                       try {
-                               sql = generateUpdate(rootNode, attributes, 
tableName, updateKeys, schema, translateFieldNames);
-                       } catch (final ProcessException pe) {
-                               getLogger().error("Failed to convert {} to a 
SQL UPDATE statement due to {}; routing to failure",
-                                               new Object[] { flowFile, 
pe.toString() }, pe);
-                               session.transfer(flowFile, REL_FAILURE);
-                               return;
-                       }
-               }
-
-               FlowFile sqlFlowFile = session.create(flowFile);
-               sqlFlowFile = session.write(sqlFlowFile, new 
OutputStreamCallback() {
-                       @Override
-                       public void process(final OutputStream out) throws 
IOException {
-                               out.write(sql.getBytes(StandardCharsets.UTF_8));
-                       }
-               });
-
-               attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
-               attributes.put("sql.table", tableName);
-               if (catalog != null) {
-                       attributes.put("sql.catalog", catalog);
-               }
-
-               sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
-
-               session.transfer(flowFile, REL_ORIGINAL);
-               session.transfer(sqlFlowFile, REL_SQL);
-       }
-
-       private Set<String> getNormalizedColumnNames(final JsonNode node, final 
boolean translateFieldNames) {
-               final Set<String> normalizedFieldNames = new HashSet<>();
-               final Iterator<String> fieldNameItr = node.getFieldNames();
-               while (fieldNameItr.hasNext()) {
-                       
normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), 
translateFieldNames));
-               }
-
-               return normalizedFieldNames;
-       }
-
-       private String generateInsert(final JsonNode rootNode, final 
Map<String, String> attributes, final String tableName,
-                       final TableSchema schema, final boolean 
translateFieldNames) {
-
-               final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
-               for (final String requiredColName : 
schema.getRequiredColumnNames()) {
-                       final String normalizedColName = 
normalizeColumnName(requiredColName, translateFieldNames);
-                       if (!normalizedFieldNames.contains(normalizedColName)) {
-                               throw new ProcessException("JSON does not have 
a value for the Required column '" + requiredColName + "'");
-                       }
-               }
-
-               final StringBuilder sqlBuilder = new StringBuilder();
-               int fieldCount = 0;
-               sqlBuilder.append("INSERT INTO ").append(tableName).append(" 
(");
-
-               // iterate over all of the elements in the JSON, building the 
SQL statement by adding the column names, as well as
-               // adding the column value to a "sql.args.N.value" attribute 
and the type of a "sql.args.N.type" attribute add the
-               // columns that we are inserting into
-               final Iterator<String> fieldNames = rootNode.getFieldNames();
-               while (fieldNames.hasNext()) {
-                       final String fieldName = fieldNames.next();
-
-                       final ColumnDescription desc = 
schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
-                       if (desc != null) {
-                               if (fieldCount++ > 0) {
-                                       sqlBuilder.append(", ");
-                               }
-
-                               sqlBuilder.append(desc.getColumnName());
-
-                               final int sqlType = desc.getDataType();
-                               attributes.put("sql.args." + fieldCount + 
".type", String.valueOf(sqlType));
-
-                               final Integer colSize = desc.getColumnSize();
-                               String fieldValue = 
rootNode.get(fieldName).asText();
-                               if (colSize != null && fieldValue.length() > 
colSize) {
-                                       fieldValue = fieldValue.substring(0, 
colSize);
-                               }
-                               attributes.put("sql.args." + fieldCount + 
".value", fieldValue);
-                       }
-               }
-
-               // complete the SQL statements by adding ?'s for all of the 
values to be escaped.
-               sqlBuilder.append(") VALUES (");
-               for (int i=0; i < fieldCount; i++) {
-                       if (i > 0) {
-                               sqlBuilder.append(", ");
-                       }
-
-                       sqlBuilder.append("?");
-               }
-               sqlBuilder.append(")");
-
-               if (fieldCount == 0) {
-                       throw new ProcessException("None of the fields in the 
JSON map to the columns defined by the " + tableName + " table");
-               }
-
-               return sqlBuilder.toString();
-       }
-
-       private String generateUpdate(final JsonNode rootNode, final 
Map<String, String> attributes, final String tableName, final String updateKeys,
-                       final TableSchema schema, final boolean 
translateFieldNames) {
-
-               final Set<String> updateKeyNames;
-               if (updateKeys == null) {
-                       updateKeyNames = schema.getPrimaryKeyColumnNames();
-               } else {
-                       updateKeyNames = new HashSet<>();
-                       for (final String updateKey : updateKeys.split(",")) {
-                               updateKeyNames.add(updateKey.trim());
-                       }
-               }
-
-               if (updateKeyNames.isEmpty()) {
-                       throw new ProcessException("Table '" + tableName + "' 
does not have a Primary Key and no Update Keys were specified");
-               }
-
-               final StringBuilder sqlBuilder = new StringBuilder();
-               int fieldCount = 0;
-               sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
-
-
-               // Create a Set of all normalized Update Key names, and ensure 
that there is a field in the JSON
-               // for each of the Update Key fields.
-               final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
-               final Set<String> normalizedUpdateNames = new HashSet<>();
-               for (final String uk : updateKeyNames) {
-                       final String normalizedUK = normalizeColumnName(uk, 
translateFieldNames);
-                       normalizedUpdateNames.add(normalizedUK);
-
-                       if (!normalizedFieldNames.contains(normalizedUK)) {
-                               throw new ProcessException("JSON does not have 
a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column 
'" + uk + "'");
-                       }
-               }
-
-               // iterate over all of the elements in the JSON, building the 
SQL statement by adding the column names, as well as
-               // adding the column value to a "sql.args.N.value" attribute 
and the type of a "sql.args.N.type" attribute add the
-               // columns that we are inserting into
-               Iterator<String> fieldNames = rootNode.getFieldNames();
-               while (fieldNames.hasNext()) {
-                       final String fieldName = fieldNames.next();
-
-                       final String normalizedColName = 
normalizeColumnName(fieldName, translateFieldNames);
-                       final ColumnDescription desc = 
schema.getColumns().get(normalizedColName);
-                       if (desc == null) {
-                               continue;
-                       }
-
-                       // Check if this column is an Update Key. If so, skip 
it for now. We will come
-                       // back to it after we finish the SET clause
-                       if (normalizedUpdateNames.contains(normalizedColName)) {
-                               continue;
-                       }
-
-                       if (fieldCount++ > 0) {
-                               sqlBuilder.append(", ");
-                       }
-
-                       sqlBuilder.append(desc.getColumnName()).append(" = ?");
-                       final int sqlType = desc.getDataType();
-                       attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
-
-                       final Integer colSize = desc.getColumnSize();
-                       String fieldValue = rootNode.get(fieldName).asText();
-                       if (colSize != null && fieldValue.length() > colSize) {
-                               fieldValue = fieldValue.substring(0, colSize);
-                       }
-                       attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
-               }
-
-               // Set the WHERE clause based on the Update Key values
-               sqlBuilder.append(" WHERE ");
-
-               fieldNames = rootNode.getFieldNames();
-               int whereFieldCount = 0;
-               while (fieldNames.hasNext()) {
-                       final String fieldName = fieldNames.next();
-
-                       final String normalizedColName = 
normalizeColumnName(fieldName, translateFieldNames);
-                       final ColumnDescription desc = 
schema.getColumns().get(normalizedColName);
-                       if (desc == null) {
-                               continue;
-                       }
-
-                       // Check if this column is a Update Key. If so, skip it 
for now. We will come
-                       // back to it after we finish the SET clause
-                       if (!normalizedUpdateNames.contains(normalizedColName)) 
{
-                               continue;
-                       }
-
-                       if (whereFieldCount++ > 0) {
-                               sqlBuilder.append(" AND ");
-                       }
-                       fieldCount++;
-
-                       sqlBuilder.append(normalizedColName).append(" = ?");
-                       final int sqlType = desc.getDataType();
-                       attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
-
-                       final Integer colSize = desc.getColumnSize();
-                       String fieldValue = rootNode.get(fieldName).asText();
-                       if (colSize != null && fieldValue.length() > colSize) {
-                               fieldValue = fieldValue.substring(0, colSize);
-                       }
-                       attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
-               }
-
-               return sqlBuilder.toString();
-       }
-
-       private static String normalizeColumnName(final String colName, final 
boolean translateColumnNames) {
-               return translateColumnNames ? 
colName.toUpperCase().replace("_", "") : colName;
-       }
-
-       private static class TableSchema {
-               private List<String> requiredColumnNames;
-               private Set<String> primaryKeyColumnNames;
-               private Map<String, ColumnDescription> columns;
-
-               private TableSchema(final List<ColumnDescription> 
columnDescriptions, final boolean translateColumnNames,
-                               final Set<String> primaryKeyColumnNames) {
-                       this.columns = new HashMap<>();
-                       this.primaryKeyColumnNames = primaryKeyColumnNames;
-
-                       this.requiredColumnNames = new ArrayList<>();
-                       for (final ColumnDescription desc : columnDescriptions) 
{
-                               
columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, 
translateColumnNames), desc);
-                               if (desc.isRequired()) {
-                                       
requiredColumnNames.add(desc.columnName);
-                               }
-                       }
-               }
-
-               public Map<String, ColumnDescription> getColumns() {
-                       return columns;
-               }
-
-               public List<String> getRequiredColumnNames() {
-                       return requiredColumnNames;
-               }
-
-               public Set<String> getPrimaryKeyColumnNames() {
-                       return primaryKeyColumnNames;
-               }
-
-               public static TableSchema from(final Connection conn, final 
String catalog, final String tableName,
-                               final boolean translateColumnNames, final 
boolean includePrimaryKeys) throws SQLException {
-                       final ResultSet colrs = 
conn.getMetaData().getColumns(catalog, null, tableName, "%");
-
-                       final List<ColumnDescription> cols = new ArrayList<>();
-                       while (colrs.next()) {
-                               final ColumnDescription col = 
ColumnDescription.from(colrs);
-                               cols.add(col);
-                       }
-
-                       final Set<String> primaryKeyColumns = new HashSet<>();
-                       if (includePrimaryKeys) {
-                               final ResultSet pkrs = 
conn.getMetaData().getPrimaryKeys(catalog, null, tableName);
-
-                               while (pkrs.next()) {
-                                       final String colName = 
pkrs.getString("COLUMN_NAME");
-                                       
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
-                               }
-                       }
-
-                       return new TableSchema(cols, translateColumnNames, 
primaryKeyColumns);
-               }
-       }
-
-       private static class ColumnDescription {
-               private final String columnName;
-               private final int dataType;
-               private final boolean required;
-               private final Integer columnSize;
-
-               private ColumnDescription(final String columnName, final int 
dataType, final boolean required, final Integer columnSize) {
-                       this.columnName = columnName;
-                       this.dataType = dataType;
-                       this.required = required;
-                       this.columnSize = columnSize;
-               }
-
-               public int getDataType() {
-                       return dataType;
-               }
-
-               public Integer getColumnSize() {
-                       return columnSize;
-               }
-
-               public String getColumnName() {
-                       return columnName;
-               }
-
-               public boolean isRequired() {
-                       return required;
-               }
-
-               public static ColumnDescription from(final ResultSet resultSet) 
throws SQLException {
-                       final String columnName = 
resultSet.getString("COLUMN_NAME");
-                       final int dataType = resultSet.getInt("DATA_TYPE");
-                       final int colSize = resultSet.getInt("COLUMN_SIZE");
-
-                       final String nullableValue = 
resultSet.getString("IS_NULLABLE");
-                       final boolean isNullable = 
"YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
-                       final String defaultValue = 
resultSet.getString("COLUMN_DEF");
-                       final String autoIncrementValue = 
resultSet.getString("IS_AUTOINCREMENT");
-                       final boolean isAutoIncrement = 
"YES".equalsIgnoreCase(autoIncrementValue);
-                       final boolean required = !isNullable && 
!isAutoIncrement && defaultValue == null;
-
-                       return new ColumnDescription(columnName, dataType, 
required, colSize == 0 ? null : colSize);
-               }
-       }
-
-       private static class SchemaKey {
-               private final String catalog;
-               private final String tableName;
-
-               public SchemaKey(final String catalog, final String tableName) {
-                       this.catalog = catalog;
-                       this.tableName = tableName;
-               }
-
-               @Override
-               public int hashCode() {
-                       final int prime = 31;
-                       int result = 1;
-                       result = prime * result + ((catalog == null) ? 0 : 
catalog.hashCode());
-                       result = prime * result + ((tableName == null) ? 0 : 
tableName.hashCode());
-                       return result;
-               }
-
-               @Override
-               public boolean equals(final Object obj) {
-                       if (this == obj) {
-                               return true;
-                       }
-                       if (obj == null) {
-                               return false;
-                       }
-                       if (getClass() != obj.getClass()) {
-                               return false;
-                       }
-
-                       final SchemaKey other = (SchemaKey) obj;
-                       if (catalog == null) {
-                               if (other.catalog != null) {
-                                       return false;
-                               }
-                       } else if (!catalog.equals(other.catalog)) {
-                               return false;
-                       }
-
-
-                       if (tableName == null) {
-                               if (other.tableName != null) {
-                                       return false;
-                               }
-                       } else if (!tableName.equals(other.tableName)) {
-                               return false;
-                       }
-
-                       return true;
-               }
-       }
-}

Reply via email to