http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 63073e6..7422dbc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -1,431 +1,431 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class TestConvertJSONToSQL {
-    static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
-
-    @Rule
-    public TemporaryFolder folder = new TemporaryFolder();
-
-    @BeforeClass
-    public static void setup() {
-        System.setProperty("derby.stream.error.file", "target/derby.log");
-    }
-
-    @Test
-    public void testInsert() throws InitializationException, ProcessException, 
SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
-        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.1.value", "1");
-        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
-        out.assertAttributeEquals("sql.args.2.value", "Mark");
-        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.3.value", "48");
-
-        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
-    }
-
-    @Test
-    public void testInsertWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
-        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.1.value", "1");
-        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
-        out.assertAttributeEquals("sql.args.2.value", "Mark");
-        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeNotExists("sql.args.3.value");
-
-        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
-    }
-
-
-    @Test
-    public void testUpdateWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
-        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
-        out.assertAttributeEquals("sql.args.1.value", "Mark");
-        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeNotExists("sql.args.2.value");
-        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.3.value", "1");
-
-        out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE 
ID = ?");
-    }
-
-
-    @Test
-    public void testMultipleInserts() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
-        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
-        for (final MockFlowFile mff : mffs) {
-            mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) 
VALUES (?, ?, ?)");
-
-            for (int i=1; i <= 3; i++) {
-                mff.assertAttributeExists("sql.args." + i + ".type");
-                mff.assertAttributeExists("sql.args." + i + ".value");
-            }
-        }
-    }
-
-    @Test
-    public void testUpdateBasedOnPrimaryKey() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
-        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
-        out.assertAttributeEquals("sql.args.1.value", "Mark");
-        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.2.value", "48");
-        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.3.value", "1");
-
-        out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE 
ID = ?");
-    }
-
-    @Test
-    public void testUnmappedFieldBehavior() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
-        runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, 
ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue());
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
-
-        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
-
-        runner.clearTransferState();
-        runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, 
ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue());
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
-    }
-
-    @Test
-    public void testUpdateBasedOnUpdateKey() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
-        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
-        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.1.value", "1");
-        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
-        out.assertAttributeEquals("sql.args.2.value", "Mark");
-        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.3.value", "48");
-
-        out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE 
CODE = ?");
-    }
-
-    @Test
-    public void testUpdateBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
-        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
-        runner.run();
-
-        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
-        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
-        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.1.value", "1");
-        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
-        out.assertAttributeEquals("sql.args.2.value", "Mark");
-        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
-        out.assertAttributeEquals("sql.args.3.value", "48");
-
-        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND 
CODE = ?");
-    }
-
-    @Test
-    public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
-        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json"));
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
-    }
-
-    @Test
-    public void testUpdateWithMalformedJson() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
-        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json"));
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
-    }
-
-    @Test
-    public void testInsertWithMissingField() throws InitializationException, 
ProcessException, SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
-        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
-        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
-        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
-        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json"));
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
-    }
-
-
-    /**
-     * Simple implementation only for testing purposes
-     */
-    private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
-        private final String dbLocation;
-
-        public MockDBCPService(final String dbLocation) {
-            this.dbLocation = dbLocation;
-        }
-
-        @Override
-        public String getIdentifier() {
-            return "dbcp";
-        }
-
-        @Override
-        public Connection getConnection() throws ProcessException {
-            try {
-                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                final Connection con = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
-                return con;
-            } catch (final Exception e) {
-                e.printStackTrace();
-                throw new ProcessException("getConnection failed: " + e);
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestConvertJSONToSQL {
+    static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    @Test
+    public void testInsert() throws InitializationException, ProcessException, 
SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+    }
+
+    @Test
+    public void testInsertWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeNotExists("sql.args.3.value");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+    }
+
+
+    @Test
+    public void testUpdateWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.1.value", "Mark");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeNotExists("sql.args.2.value");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "1");
+
+        out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE 
ID = ?");
+    }
+
+
+    @Test
+    public void testMultipleInserts() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
+        for (final MockFlowFile mff : mffs) {
+            mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) 
VALUES (?, ?, ?)");
+
+            for (int i=1; i <= 3; i++) {
+                mff.assertAttributeExists("sql.args." + i + ".type");
+                mff.assertAttributeExists("sql.args." + i + ".value");
+            }
+        }
+    }
+
+    @Test
+    public void testUpdateBasedOnPrimaryKey() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.1.value", "Mark");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.2.value", "48");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "1");
+
+        out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE 
ID = ?");
+    }
+
+    @Test
+    public void testUnmappedFieldBehavior() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, 
ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue());
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+
+        runner.clearTransferState();
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, 
ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue());
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testUpdateBasedOnUpdateKey() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE 
CODE = ?");
+    }
+
+    @Test
+    public void testUpdateBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND 
CODE = ?");
+    }
+
+    @Test
+    public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testUpdateWithMalformedJson() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testInsertWithMissingField() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    }
+
+
+    /**
+     * Simple implementation only for testing purposes
+     */
+    private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
+        private final String dbLocation;
+
+        public MockDBCPService(final String dbLocation) {
+            this.dbLocation = dbLocation;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                final Connection con = 
DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
+                return con;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
index 82fee1b..76026bc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
@@ -1,36 +1,36 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-public class TestDuplicateFlowFile {
-
-    @Test
-    public void test() {
-        final TestRunner runner = 
TestRunners.newTestRunner(DuplicateFlowFile.class);
-        runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100");
-
-        runner.enqueue("hello".getBytes());
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 
101);
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestDuplicateFlowFile {
+
+    @Test
+    public void test() {
+        final TestRunner runner = 
TestRunners.newTestRunner(DuplicateFlowFile.class);
+        runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100");
+
+        runner.enqueue("hello".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 
101);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
index a5581b3..8e7bc05 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
@@ -1,163 +1,163 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.security.util.EncryptionMethod;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestEncryptContent {
-
-    @Test
-    public void testRoundTrip() throws IOException {
-        final TestRunner testRunner = TestRunners.newTestRunner(new 
EncryptContent());
-        testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
-
-        for (final EncryptionMethod method : EncryptionMethod.values()) {
-            if (method.isUnlimitedStrength()) {
-                continue;   // cannot test unlimited strength in unit tests 
because it's not enabled by the JVM by default.
-            }
-            testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, 
method.name());
-            testRunner.setProperty(EncryptContent.MODE, 
EncryptContent.ENCRYPT_MODE);
-
-            testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
-            testRunner.clearTransferState();
-            testRunner.run();
-
-            
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
-
-            MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
-            testRunner.assertQueueEmpty();
-
-            testRunner.setProperty(EncryptContent.MODE, 
EncryptContent.DECRYPT_MODE);
-            testRunner.enqueue(flowFile);
-            testRunner.clearTransferState();
-            testRunner.run();
-            
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
-
-            flowFile = 
testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
-            flowFile.assertContentEquals(new 
File("src/test/resources/hello.txt"));
-        }
-    }
-
-    @Test
-    public void testDecryptSmallerThanSaltSize() {
-        final TestRunner runner = 
TestRunners.newTestRunner(EncryptContent.class);
-        runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
-        runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
-        runner.enqueue(new byte[4]);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
-    }
-
-    @Test
-    public void testPGPDecrypt() throws IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(EncryptContent.class);
-        runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
-        runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, 
EncryptionMethod.PGP_ASCII_ARMOR.name());
-        runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
-
-        
runner.enqueue(Paths.get("src/test/resources/TestEncryptContent/text.txt.asc"));
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
-        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
-        
flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt"));
-    }
-
-    @Test
-    public void testValidation() {
-        final TestRunner runner = 
TestRunners.newTestRunner(EncryptContent.class);
-        Collection<ValidationResult> results;
-        MockProcessContext pc;
-
-        results = new HashSet<>();
-        runner.enqueue(new byte[0]);
-        pc = (MockProcessContext) runner.getProcessContext();
-        results = pc.validate();
-        Assert.assertEquals(1, results.size());
-        for (final ValidationResult vr : results) {
-            Assert.assertTrue(vr.toString()
-                    .contains(EncryptContent.PASSWORD.getDisplayName() + " is 
required when using algorithm"));
-        }
-
-        results = new HashSet<>();
-        runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, 
EncryptionMethod.PGP.name());
-        runner.setProperty(EncryptContent.PUBLIC_KEYRING, 
"src/test/resources/TestEncryptContent/text.txt");
-        runner.enqueue(new byte[0]);
-        pc = (MockProcessContext) runner.getProcessContext();
-        results = pc.validate();
-        Assert.assertEquals(1, results.size());
-        for (final ValidationResult vr : results) {
-            Assert.assertTrue(vr.toString().contains(
-                    " encryption without a " + 
EncryptContent.PASSWORD.getDisplayName() + " requires both "
-                            + EncryptContent.PUBLIC_KEYRING.getDisplayName() + 
" and "
-                            + 
EncryptContent.PUBLIC_KEY_USERID.getDisplayName()));
-        }
-
-        results = new HashSet<>();
-        runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID");
-        runner.enqueue(new byte[0]);
-        pc = (MockProcessContext) runner.getProcessContext();
-        results = pc.validate();
-        Assert.assertEquals(1, results.size());
-        for (final ValidationResult vr : results) {
-            Assert.assertTrue(vr.toString().contains("does not contain user id 
USERID"));
-        }
-
-        runner.removeProperty(EncryptContent.PUBLIC_KEYRING);
-        runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID);
-
-        results = new HashSet<>();
-        runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
-        runner.setProperty(EncryptContent.PRIVATE_KEYRING, 
"src/test/resources/TestEncryptContent/text.txt");
-        runner.enqueue(new byte[0]);
-        pc = (MockProcessContext) runner.getProcessContext();
-        results = pc.validate();
-        Assert.assertEquals(1, results.size());
-        for (final ValidationResult vr : results) {
-            Assert.assertTrue(vr.toString().contains(
-                    " decryption without a " + 
EncryptContent.PASSWORD.getDisplayName() + " requires both "
-                    + EncryptContent.PRIVATE_KEYRING.getDisplayName() + " and "
-                    + 
EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
-
-        }
-
-        results = new HashSet<>();
-        runner.setProperty(EncryptContent.PRIVATE_KEYRING_PASSPHRASE, 
"PASSWORD");
-        runner.enqueue(new byte[0]);
-        pc = (MockProcessContext) runner.getProcessContext();
-        results = pc.validate();
-        Assert.assertEquals(1, results.size());
-        for (final ValidationResult vr : results) {
-            Assert.assertTrue(vr.toString().contains(
-                    " could not be opened with the provided " + 
EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
-
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.security.util.EncryptionMethod;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestEncryptContent {
+
+    @Test
+    public void testRoundTrip() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
EncryptContent());
+        testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
+
+        for (final EncryptionMethod method : EncryptionMethod.values()) {
+            if (method.isUnlimitedStrength()) {
+                continue;   // cannot test unlimited strength in unit tests 
because it's not enabled by the JVM by default.
+            }
+            testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, 
method.name());
+            testRunner.setProperty(EncryptContent.MODE, 
EncryptContent.ENCRYPT_MODE);
+
+            testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+            testRunner.clearTransferState();
+            testRunner.run();
+
+            
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
+
+            MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
+            testRunner.assertQueueEmpty();
+
+            testRunner.setProperty(EncryptContent.MODE, 
EncryptContent.DECRYPT_MODE);
+            testRunner.enqueue(flowFile);
+            testRunner.clearTransferState();
+            testRunner.run();
+            
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
+
+            flowFile = 
testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
+            flowFile.assertContentEquals(new 
File("src/test/resources/hello.txt"));
+        }
+    }
+
+    @Test
+    public void testDecryptSmallerThanSaltSize() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EncryptContent.class);
+        runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
+        runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
+        runner.enqueue(new byte[4]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testPGPDecrypt() throws IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(EncryptContent.class);
+        runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
+        runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, 
EncryptionMethod.PGP_ASCII_ARMOR.name());
+        runner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
+
+        
runner.enqueue(Paths.get("src/test/resources/TestEncryptContent/text.txt.asc"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0);
+        
flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt"));
+    }
+
+    @Test
+    public void testValidation() {
+        final TestRunner runner = 
TestRunners.newTestRunner(EncryptContent.class);
+        Collection<ValidationResult> results;
+        MockProcessContext pc;
+
+        results = new HashSet<>();
+        runner.enqueue(new byte[0]);
+        pc = (MockProcessContext) runner.getProcessContext();
+        results = pc.validate();
+        Assert.assertEquals(1, results.size());
+        for (final ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString()
+                    .contains(EncryptContent.PASSWORD.getDisplayName() + " is 
required when using algorithm"));
+        }
+
+        results = new HashSet<>();
+        runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, 
EncryptionMethod.PGP.name());
+        runner.setProperty(EncryptContent.PUBLIC_KEYRING, 
"src/test/resources/TestEncryptContent/text.txt");
+        runner.enqueue(new byte[0]);
+        pc = (MockProcessContext) runner.getProcessContext();
+        results = pc.validate();
+        Assert.assertEquals(1, results.size());
+        for (final ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains(
+                    " encryption without a " + 
EncryptContent.PASSWORD.getDisplayName() + " requires both "
+                            + EncryptContent.PUBLIC_KEYRING.getDisplayName() + 
" and "
+                            + 
EncryptContent.PUBLIC_KEY_USERID.getDisplayName()));
+        }
+
+        results = new HashSet<>();
+        runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID");
+        runner.enqueue(new byte[0]);
+        pc = (MockProcessContext) runner.getProcessContext();
+        results = pc.validate();
+        Assert.assertEquals(1, results.size());
+        for (final ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains("does not contain user id 
USERID"));
+        }
+
+        runner.removeProperty(EncryptContent.PUBLIC_KEYRING);
+        runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID);
+
+        results = new HashSet<>();
+        runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE);
+        runner.setProperty(EncryptContent.PRIVATE_KEYRING, 
"src/test/resources/TestEncryptContent/text.txt");
+        runner.enqueue(new byte[0]);
+        pc = (MockProcessContext) runner.getProcessContext();
+        results = pc.validate();
+        Assert.assertEquals(1, results.size());
+        for (final ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains(
+                    " decryption without a " + 
EncryptContent.PASSWORD.getDisplayName() + " requires both "
+                    + EncryptContent.PRIVATE_KEYRING.getDisplayName() + " and "
+                    + 
EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
+
+        }
+
+        results = new HashSet<>();
+        runner.setProperty(EncryptContent.PRIVATE_KEYRING_PASSPHRASE, 
"PASSWORD");
+        runner.enqueue(new byte[0]);
+        pc = (MockProcessContext) runner.getProcessContext();
+        results = pc.validate();
+        Assert.assertEquals(1, results.size());
+        for (final ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains(
+                    " could not be opened with the provided " + 
EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName()));
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
index 9aa9772..11c47ce 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
@@ -1,128 +1,128 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.http.HttpContextMap;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestHandleHttpRequest {
-
-    @Test(timeout=10000)
-    public void testRequestAddedToService() throws InitializationException, 
MalformedURLException, IOException, InterruptedException {
-        final TestRunner runner = 
TestRunners.newTestRunner(HandleHttpRequest.class);
-        runner.setProperty(HandleHttpRequest.PORT, "0");
-
-        final MockHttpContextMap contextMap = new MockHttpContextMap();
-        runner.addControllerService("http-context-map", contextMap);
-        runner.enableControllerService(contextMap);
-        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, 
"http-context-map");
-
-        // trigger processor to stop but not shutdown.
-        runner.run(1, false);
-        try {
-            final Thread httpThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        final int port = ((HandleHttpRequest) 
runner.getProcessor()).getPort();
-                        final HttpURLConnection connection = 
(HttpURLConnection) new URL("http://localhost:";
-                                + port + 
"/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
-                        connection.setDoOutput(false);
-                        connection.setRequestMethod("GET");
-                        connection.setRequestProperty("header1", "value1");
-                        connection.setRequestProperty("header2", "");
-                        connection.setRequestProperty("header3", 
"apple=orange");
-                        connection.setConnectTimeout(3000);
-                        connection.setReadTimeout(3000);
-
-                        StreamUtils.copy(connection.getInputStream(), new 
NullOutputStream());
-                    } catch (final Throwable t) {
-                        t.printStackTrace();
-                        Assert.fail(t.toString());
-                    }
-                }
-            });
-            httpThread.start();
-
-            while ( 
runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
-                // process the request.
-                runner.run(1, false, false);
-            }
-
-            
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
-            assertEquals(1, contextMap.size());
-
-            final MockFlowFile mff = 
runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
-            mff.assertAttributeEquals("http.query.param.query", "true");
-            mff.assertAttributeEquals("http.query.param.value1", "value1");
-            mff.assertAttributeEquals("http.query.param.value2", "");
-            mff.assertAttributeEquals("http.query.param.value3", "");
-            mff.assertAttributeEquals("http.query.param.value4", 
"apple=orange");
-            mff.assertAttributeEquals("http.headers.header1", "value1");
-            mff.assertAttributeEquals("http.headers.header3", "apple=orange");
-        } finally {
-            // shut down the server
-            runner.run(1, true);
-        }
-    }
-
-    private static class MockHttpContextMap extends AbstractControllerService 
implements HttpContextMap {
-
-        private final ConcurrentMap<String, HttpServletResponse> responseMap = 
new ConcurrentHashMap<>();
-
-        @Override
-        public boolean register(final String identifier, final 
HttpServletRequest request, final HttpServletResponse response, final 
AsyncContext context) {
-            responseMap.put(identifier, response);
-            return true;
-        }
-
-        @Override
-        public HttpServletResponse getResponse(final String identifier) {
-            return responseMap.get(identifier);
-        }
-
-        @Override
-        public void complete(final String identifier) {
-            responseMap.remove(identifier);
-        }
-
-        public int size() {
-            return responseMap.size();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHandleHttpRequest {
+
+    @Test(timeout=10000)
+    public void testRequestAddedToService() throws InitializationException, 
MalformedURLException, IOException, InterruptedException {
+        final TestRunner runner = 
TestRunners.newTestRunner(HandleHttpRequest.class);
+        runner.setProperty(HandleHttpRequest.PORT, "0");
+
+        final MockHttpContextMap contextMap = new MockHttpContextMap();
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, 
"http-context-map");
+
+        // trigger processor to stop but not shutdown.
+        runner.run(1, false);
+        try {
+            final Thread httpThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        final int port = ((HandleHttpRequest) 
runner.getProcessor()).getPort();
+                        final HttpURLConnection connection = 
(HttpURLConnection) new URL("http://localhost:";
+                                + port + 
"/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+                        connection.setDoOutput(false);
+                        connection.setRequestMethod("GET");
+                        connection.setRequestProperty("header1", "value1");
+                        connection.setRequestProperty("header2", "");
+                        connection.setRequestProperty("header3", 
"apple=orange");
+                        connection.setConnectTimeout(3000);
+                        connection.setReadTimeout(3000);
+
+                        StreamUtils.copy(connection.getInputStream(), new 
NullOutputStream());
+                    } catch (final Throwable t) {
+                        t.printStackTrace();
+                        Assert.fail(t.toString());
+                    }
+                }
+            });
+            httpThread.start();
+
+            while ( 
runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
+                // process the request.
+                runner.run(1, false, false);
+            }
+
+            
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+            assertEquals(1, contextMap.size());
+
+            final MockFlowFile mff = 
runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+            mff.assertAttributeEquals("http.query.param.query", "true");
+            mff.assertAttributeEquals("http.query.param.value1", "value1");
+            mff.assertAttributeEquals("http.query.param.value2", "");
+            mff.assertAttributeEquals("http.query.param.value3", "");
+            mff.assertAttributeEquals("http.query.param.value4", 
"apple=orange");
+            mff.assertAttributeEquals("http.headers.header1", "value1");
+            mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+        } finally {
+            // shut down the server
+            runner.run(1, true);
+        }
+    }
+
+    private static class MockHttpContextMap extends AbstractControllerService 
implements HttpContextMap {
+
+        private final ConcurrentMap<String, HttpServletResponse> responseMap = 
new ConcurrentHashMap<>();
+
+        @Override
+        public boolean register(final String identifier, final 
HttpServletRequest request, final HttpServletResponse response, final 
AsyncContext context) {
+            responseMap.put(identifier, response);
+            return true;
+        }
+
+        @Override
+        public HttpServletResponse getResponse(final String identifier) {
+            return responseMap.get(identifier);
+        }
+
+        @Override
+        public void complete(final String identifier) {
+            responseMap.remove(identifier);
+        }
+
+        public int size() {
+            return responseMap.size();
+        }
+    }
+}

Reply via email to