Repository: nifi
Updated Branches:
  refs/heads/master 16dc5d5fd -> 28549c2b1


http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
deleted file mode 100644
index d349a58..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.stream.io.StreamUtils;
-
-@SupportsBatching
-@SeeAlso(ConvertFlatJSONToSQL.class)
-@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
-@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content 
of an incoming FlowFile is expected to be the SQL command "
-               + "to execute. The SQL command may use the ? to escape 
parameters. In this case, the parameters to use must exist as FlowFile 
attributes "
-               + "with the naming convention sql.args.N.type and 
sql.args.N.value, where N is a positive integer. The sql.args.N.type is 
expected to be "
-               + "a number indicating the JDBC Type. The content of the 
FlowFile is expected to be in UTF-8 format.")
-public class PutSQL extends AbstractProcessor {
-
-       static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
-                       .name("JDBC Connection Pool")
-                       .description("Specifies the JDBC Connection Pool to use 
in order to convert the JSON message to a SQL statement. "
-                                       + "The Connection Pool is necessary in 
order to determine the appropriate database column types.")
-                       .identifiesControllerService(DBCPService.class)
-                       .required(true)
-                       .build();
-
-
-       static final Relationship REL_SUCCESS = new Relationship.Builder()
-                       .name("success")
-                       .description("A FlowFile is routed to this relationship 
after the database is successfully updated")
-                       .build();
-       static final Relationship REL_FAILURE = new Relationship.Builder()
-                       .name("failure")
-                       .description("A FlowFile is routed to this relationship 
if the database cannot be updated for any reason")
-                       .build();
-
-       private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("sql\\.args\\.(\\d+)\\.type");
-       private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
-
-       @Override
-       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-               final List<PropertyDescriptor> properties = new ArrayList<>();
-               properties.add(CONNECTION_POOL);
-               return properties;
-       }
-
-       @Override
-       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
-               FlowFile flowFile = session.get();
-               if (flowFile == null) {
-                       return;
-               }
-
-               // TODO: Batch. Pull in 50 or 100 at a time and map content of 
FlowFile to Set<FlowFile> that have that
-               // same content. Then execute updates in batches.
-
-               final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
-               try (final Connection conn = dbcpService.getConnection()) {
-                       // Read the SQL from the FlowFile's content
-                       final byte[] buffer = new byte[(int) 
flowFile.getSize()];
-                       session.read(flowFile, new InputStreamCallback() {
-                               @Override
-                               public void process(final InputStream in) 
throws IOException {
-                                       StreamUtils.fillBuffer(in, buffer);
-                               }
-                       });
-
-                       final String sql = new String(buffer, 
StandardCharsets.UTF_8);
-
-                       // Create a prepared statement and set the appropriate 
parameters on the statement.
-                       try (final PreparedStatement stmt = 
conn.prepareStatement(sql)) {
-                               for (final Map.Entry<String, String> entry : 
flowFile.getAttributes().entrySet()) {
-                                       final String key = entry.getKey();
-                                       final Matcher matcher = 
SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
-                                       if (matcher.matches()) {
-                                               final int parameterIndex = 
Integer.parseInt(matcher.group(1));
-
-                                               final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
-                                               if (!isNumeric) {
-                                                       
getLogger().error("Cannot update database for {} because the value of the '{}' 
attribute is '{}', which is not a valid JDBC numeral type; routing to failure",
-                                                                       new 
Object[] {flowFile, key, entry.getValue()});
-                                                       
session.transfer(flowFile, REL_FAILURE);
-                                                       return;
-                                               }
-
-                                               final int jdbcType = 
Integer.parseInt(entry.getValue());
-                                               final String valueAttrName = 
"sql.args." + parameterIndex + ".value";
-                                               final String parameterValue = 
flowFile.getAttribute(valueAttrName);
-                                               if (parameterValue == null) {
-                                                       
getLogger().error("Cannot update database for {} because the '{}' attribute 
exists but the '{}' attribute does not", new Object[] {flowFile, key, 
valueAttrName});
-                                                       
session.transfer(flowFile, REL_FAILURE);
-                                                       return;
-                                               }
-
-                                               try {
-                                                       setParameter(stmt, 
valueAttrName, parameterIndex, parameterValue, jdbcType);
-                                               } catch (final 
NumberFormatException nfe) {
-                                                       
getLogger().error("Cannot update database for {} because the '{}' attribute has 
a value of '{}', "
-                                                                       + 
"which cannot be converted into the necessary data type; routing to failure", 
new Object[] {flowFile, valueAttrName, parameterValue});
-                                                       
session.transfer(flowFile, REL_FAILURE);
-                                                       return;
-                                               }
-                                       }
-                               }
-
-                               final int updatedRowCount = 
stmt.executeUpdate();
-                               flowFile = session.putAttribute(flowFile, 
"sql.update.count", String.valueOf(updatedRowCount));
-                       }
-
-                       // TODO: Need to expose Connection URL from DBCP 
Service and use it to emit a Provenance Event
-                       session.transfer(flowFile, REL_SUCCESS);
-               } catch (final SQLException e) {
-                       getLogger().error("Failed to update database for {} due 
to {}; routing to failure", new Object[] {flowFile, e});
-                       session.transfer(flowFile, REL_FAILURE);
-                       return;
-               }
-       }
-
-       private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException {
-               switch (jdbcType) {
-                       case Types.BIT:
-                               stmt.setBoolean(parameterIndex, 
Boolean.parseBoolean(parameterValue));
-                               break;
-                       case Types.TINYINT:
-                               stmt.setByte(parameterIndex, 
Byte.parseByte(parameterValue));
-                               break;
-                       case Types.SMALLINT:
-                               stmt.setShort(parameterIndex, 
Short.parseShort(parameterValue));
-                               break;
-                       case Types.INTEGER:
-                               stmt.setInt(parameterIndex, 
Integer.parseInt(parameterValue));
-                               break;
-                       case Types.BIGINT:
-                               stmt.setLong(parameterIndex, 
Long.parseLong(parameterValue));
-                               break;
-                       case Types.REAL:
-                               stmt.setFloat(parameterIndex, 
Float.parseFloat(parameterValue));
-                               break;
-                       case Types.FLOAT:
-                       case Types.DOUBLE:
-                               stmt.setDouble(parameterIndex, 
Double.parseDouble(parameterValue));
-                               break;
-                       case Types.DATE:
-                               stmt.setDate(parameterIndex, new 
Date(Long.parseLong(parameterValue)));
-                               break;
-                       case Types.TIME:
-                               stmt.setTime(parameterIndex, new 
Time(Long.parseLong(parameterValue)));
-                               break;
-                       case Types.TIMESTAMP:
-                               stmt.setTimestamp(parameterIndex, new 
Timestamp(Long.parseLong(parameterValue)));
-                               break;
-                       case Types.CHAR:
-                       case Types.VARCHAR:
-                       case Types.LONGNVARCHAR:
-                       case Types.LONGVARCHAR:
-                               stmt.setString(parameterIndex, parameterValue);
-                               break;
-                       default:
-                               throw new SQLException("The '" + attrName + "' 
attribute has a value of '" + parameterValue
-                                               + "' and a type of '" + 
jdbcType + "' but this is not a known data type");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
deleted file mode 100644
index b2bdbba..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class TestConvertFlatJSONToSQL {
-    static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
-
-       @Rule
-       public TemporaryFolder folder = new TemporaryFolder();
-
-       @BeforeClass
-       public static void setup() {
-               System.setProperty("derby.stream.error.file", 
"target/derby.log");
-       }
-
-       @Test
-       public void testInsert() throws InitializationException, 
ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
-               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
-               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"INSERT");
-               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
-               runner.run();
-
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
-               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
-               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.1.value", "1");
-               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
-               out.assertAttributeEquals("sql.args.2.value", "Mark");
-               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.3.value", "48");
-
-               out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) 
VALUES (?, ?, ?)");
-       }
-
-
-       @Test
-       public void testUpdateBasedOnPrimaryKey() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
-               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
-               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
-               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
-               runner.run();
-
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
-               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
-               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
-               out.assertAttributeEquals("sql.args.1.value", "Mark");
-               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.2.value", "48");
-               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.3.value", "1");
-
-               out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? 
WHERE ID = ?");
-       }
-
-
-       @Test
-       public void testUpdateBasedOnUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
-               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
-               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
-               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code");
-               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
-               runner.run();
-
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
-               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
-               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.1.value", "1");
-               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
-               out.assertAttributeEquals("sql.args.2.value", "Mark");
-               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.3.value", "48");
-
-               out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? 
WHERE CODE = ?");
-       }
-
-       @Test
-       public void testUpdateBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
-               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
-               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
-               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
-               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json"));
-               runner.run();
-
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 
1);
-               runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1);
-               final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0);
-               out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.1.value", "1");
-               out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
-               out.assertAttributeEquals("sql.args.2.value", "Mark");
-               out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-               out.assertAttributeEquals("sql.args.3.value", "48");
-
-               out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = 
? AND CODE = ?");
-       }
-
-       @Test
-       public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
-               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
-               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
-               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
-               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json"));
-               runner.run();
-
-               
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
-       }
-
-       @Test
-       public void testUpdateWithMalformedJson() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
-               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
-               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"UPDATE");
-               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
-               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json"));
-               runner.run();
-
-               
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
-       }
-
-       @Test
-       public void testInsertWithMissingField() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(ConvertFlatJSONToSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, 
"dbcp");
-               runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS");
-               runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, 
"INSERT");
-               runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name,  
code");
-               
runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json"));
-               runner.run();
-
-               
runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1);
-       }
-
-
-       /**
-        * Simple implementation only for testing purposes
-        */
-       private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
-               private final String dbLocation;
-
-               public MockDBCPService(final String dbLocation) {
-                       this.dbLocation = dbLocation;
-               }
-
-               @Override
-               public String getIdentifier() {
-                       return "dbcp";
-               }
-
-               @Override
-               public Connection getConnection() throws ProcessException {
-                       try {
-                               
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                               final Connection con = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
-                               return con;
-                       } catch (final Exception e) {
-                               e.printStackTrace();
-                               throw new ProcessException("getConnection 
failed: " + e);
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
deleted file mode 100644
index ccba3e1..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class TestPutSQL {
-    static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
-
-       @Rule
-       public TemporaryFolder folder = new TemporaryFolder();
-
-       @BeforeClass
-       public static void setup() {
-               System.setProperty("derby.stream.error.file", 
"target/derby.log");
-       }
-
-       @Test
-       public void testDirectStatements() throws InitializationException, 
ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(PutSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
-               runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 
'Mark', 84)".getBytes());
-               runner.run();
-
-               runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
-                               assertTrue(rs.next());
-                               assertEquals(1, rs.getInt(1));
-                               assertEquals("Mark", rs.getString(2));
-                               assertEquals(84, rs.getInt(3));
-                               assertFalse(rs.next());
-                       }
-               }
-
-               runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE 
ID=1".getBytes());
-               runner.run();
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
-                               assertTrue(rs.next());
-                               assertEquals(1, rs.getInt(1));
-                               assertEquals("George", rs.getString(2));
-                               assertEquals(84, rs.getInt(3));
-                               assertFalse(rs.next());
-                       }
-               }
-       }
-
-
-       @Test
-       public void testStatementsWithPreparedParameters() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(PutSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
-               final Map<String, String> attributes = new HashMap<>();
-               attributes.put("sql.args.1.type", 
String.valueOf(Types.INTEGER));
-               attributes.put("sql.args.1.value", "1");
-
-               attributes.put("sql.args.2.type", 
String.valueOf(Types.VARCHAR));
-               attributes.put("sql.args.2.value", "Mark");
-
-               attributes.put("sql.args.3.type", 
String.valueOf(Types.INTEGER));
-               attributes.put("sql.args.3.value", "84");
-
-               runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, 
?, ?)".getBytes(), attributes);
-               runner.run();
-
-               runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
-                               assertTrue(rs.next());
-                               assertEquals(1, rs.getInt(1));
-                               assertEquals("Mark", rs.getString(2));
-                               assertEquals(84, rs.getInt(3));
-                               assertFalse(rs.next());
-                       }
-               }
-
-               runner.clearTransferState();
-
-               attributes.clear();
-               attributes.put("sql.args.1.type", 
String.valueOf(Types.VARCHAR));
-               attributes.put("sql.args.1.value", "George");
-
-               attributes.put("sql.args.2.type", 
String.valueOf(Types.INTEGER));
-               attributes.put("sql.args.2.value", "1");
-
-               runner.enqueue("UPDATE PERSONS SET NAME=? WHERE 
ID=?".getBytes(), attributes);
-               runner.run();
-               runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
-                               assertTrue(rs.next());
-                               assertEquals(1, rs.getInt(1));
-                               assertEquals("George", rs.getString(2));
-                               assertEquals(84, rs.getInt(3));
-                               assertFalse(rs.next());
-                       }
-               }
-       }
-
-
-       @Test
-       public void testMultipleStatementsWithinFlowFile() throws 
InitializationException, ProcessException, SQLException, IOException {
-               final TestRunner runner = 
TestRunners.newTestRunner(PutSQL.class);
-               final File tempDir = folder.getRoot();
-               final File dbDir = new File(tempDir, "db");
-               final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-               runner.addControllerService("dbcp", service);
-               runner.enableControllerService(service);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               stmt.executeUpdate(createPersons);
-                       }
-               }
-
-               runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
-
-               final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?); " +
-                               "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
-               final Map<String, String> attributes = new HashMap<>();
-               attributes.put("sql.args.1.type", 
String.valueOf(Types.INTEGER));
-               attributes.put("sql.args.1.value", "1");
-
-               attributes.put("sql.args.2.type", 
String.valueOf(Types.VARCHAR));
-               attributes.put("sql.args.2.value", "Mark");
-
-               attributes.put("sql.args.3.type", 
String.valueOf(Types.INTEGER));
-               attributes.put("sql.args.3.value", "84");
-
-               attributes.put("sql.args.4.type", 
String.valueOf(Types.INTEGER));
-               attributes.put("sql.args.4.value", "1");
-
-               runner.enqueue(sql.getBytes(), attributes);
-               runner.run();
-
-               // should fail because of the semicolon
-               runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
-
-               try (final Connection conn = service.getConnection()) {
-                       try (final Statement stmt = conn.createStatement()) {
-                               final ResultSet rs = stmt.executeQuery("SELECT 
* FROM PERSONS");
-                               assertFalse(rs.next());
-                       }
-               }
-       }
-
-
-       /**
-        * Simple implementation only for testing purposes
-        */
-       private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
-               private final String dbLocation;
-
-               public MockDBCPService(final String dbLocation) {
-                       this.dbLocation = dbLocation;
-               }
-
-               @Override
-               public String getIdentifier() {
-                       return "dbcp";
-               }
-
-               @Override
-               public Connection getConnection() throws ProcessException {
-                       try {
-                               
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                               final Connection con = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
-                               return con;
-                       } catch (final Exception e) {
-                               e.printStackTrace();
-                               throw new ProcessException("getConnection 
failed: " + e);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
deleted file mode 100644
index 2f82a94..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
-       "id": 1,
-       "name": "Mark",
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
deleted file mode 100644
index 8f98447..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
-       "id": 1,
-       "name": "Mark",
-       "code": 48
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
deleted file mode 100644
index 3ff074a..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
-       "id": 1,
-       "name": "Mark"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
deleted file mode 100644
index 347cc9e..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
-       "name": "Mark",
-       "code": 48
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7cbc76c..af1c8fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -670,6 +670,11 @@
                 <artifactId>jasypt</artifactId>
                 <version>1.9.2</version>
             </dependency>
+                       <dependency>
+                               <groupId>org.codehaus.jackson</groupId>
+                               <artifactId>jackson-mapper-asl</artifactId>
+                               <version>1.9.13</version>
+                       </dependency>
             <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-streaming_2.10</artifactId>

Reply via email to