http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index 63073e6..7422dbc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -1,431 +1,431 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestConvertJSONToSQL { - static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); - } - - @Test - public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeNotExists("sql.args.3.value"); - - out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); - } - - - @Test - public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.1.value", "Mark"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeNotExists("sql.args.2.value"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "1"); - - out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); - } - - - @Test - public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5); - final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL); - for (final MockFlowFile mff : mffs) { - mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); - - for (int i=1; i <= 3; i++) { - mff.assertAttributeExists("sql.args." + i + ".type"); - mff.assertAttributeExists("sql.args." + i + ".value"); - } - } - } - - @Test - public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.1.value", "Mark"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.2.value", "48"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "1"); - - out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); - } - - @Test - public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue()); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); - - out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); - - runner.clearTransferState(); - runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue()); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); - runner.run(); - runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); - } - - @Test - public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); - } - - @Test - public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); - } - - @Test - public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); - } - - @Test - public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); - } - - @Test - public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); - } - - - /** - * Simple implementation only for testing purposes - */ - private static class MockDBCPService extends AbstractControllerService implements DBCPService { - private final String dbLocation; - - public MockDBCPService(final String dbLocation) { - this.dbLocation = dbLocation; - } - - @Override - public String getIdentifier() { - return "dbcp"; - } - - @Override - public Connection getConnection() throws ProcessException { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); - return con; - } catch (final Exception e) { - e.printStackTrace(); - throw new ProcessException("getConnection failed: " + e); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestConvertJSONToSQL { + static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + + @Test + public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.3.value"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + + + @Test + public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.2.value"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + + @Test + public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5); + final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL); + for (final MockFlowFile mff : mffs) { + mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + + for (int i=1; i <= 3; i++) { + mff.assertAttributeExists("sql.args." + i + ".type"); + mff.assertAttributeExists("sql.args." + i + ".value"); + } + } + } + + @Test + public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.2.value", "48"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + @Test + public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue()); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + + runner.clearTransferState(); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue()); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); + runner.run(); + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); + } + + @Test + public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); + } + + @Test + public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return con; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java index 82fee1b..76026bc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java @@ -1,36 +1,36 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; - -public class TestDuplicateFlowFile { - - @Test - public void test() { - final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); - runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100"); - - runner.enqueue("hello".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 101); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestDuplicateFlowFile { + + @Test + public void test() { + final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); + runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100"); + + runner.enqueue("hello".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 101); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java index a5581b3..8e7bc05 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java @@ -1,163 +1,163 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.util.Collection; -import java.util.HashSet; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.security.util.EncryptionMethod; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Assert; -import org.junit.Test; - -public class TestEncryptContent { - - @Test - public void testRoundTrip() throws IOException { - final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent()); - testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); - - for (final EncryptionMethod method : EncryptionMethod.values()) { - if (method.isUnlimitedStrength()) { - continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default. - } - testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name()); - testRunner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); - - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); - testRunner.clearTransferState(); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); - - MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); - testRunner.assertQueueEmpty(); - - testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); - testRunner.enqueue(flowFile); - testRunner.clearTransferState(); - testRunner.run(); - testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); - - flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); - flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); - } - } - - @Test - public void testDecryptSmallerThanSaltSize() { - final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); - runner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); - runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); - runner.enqueue(new byte[4]); - runner.run(); - runner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1); - } - - @Test - public void testPGPDecrypt() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); - runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); - runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP_ASCII_ARMOR.name()); - runner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); - - runner.enqueue(Paths.get("src/test/resources/TestEncryptContent/text.txt.asc")); - runner.run(); - - runner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); - final MockFlowFile flowFile = runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); - flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt")); - } - - @Test - public void testValidation() { - final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); - Collection<ValidationResult> results; - MockProcessContext pc; - - results = new HashSet<>(); - runner.enqueue(new byte[0]); - pc = (MockProcessContext) runner.getProcessContext(); - results = pc.validate(); - Assert.assertEquals(1, results.size()); - for (final ValidationResult vr : results) { - Assert.assertTrue(vr.toString() - .contains(EncryptContent.PASSWORD.getDisplayName() + " is required when using algorithm")); - } - - results = new HashSet<>(); - runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); - runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); - runner.enqueue(new byte[0]); - pc = (MockProcessContext) runner.getProcessContext(); - results = pc.validate(); - Assert.assertEquals(1, results.size()); - for (final ValidationResult vr : results) { - Assert.assertTrue(vr.toString().contains( - " encryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both " - + EncryptContent.PUBLIC_KEYRING.getDisplayName() + " and " - + EncryptContent.PUBLIC_KEY_USERID.getDisplayName())); - } - - results = new HashSet<>(); - runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID"); - runner.enqueue(new byte[0]); - pc = (MockProcessContext) runner.getProcessContext(); - results = pc.validate(); - Assert.assertEquals(1, results.size()); - for (final ValidationResult vr : results) { - Assert.assertTrue(vr.toString().contains("does not contain user id USERID")); - } - - runner.removeProperty(EncryptContent.PUBLIC_KEYRING); - runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID); - - results = new HashSet<>(); - runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); - runner.setProperty(EncryptContent.PRIVATE_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); - runner.enqueue(new byte[0]); - pc = (MockProcessContext) runner.getProcessContext(); - results = pc.validate(); - Assert.assertEquals(1, results.size()); - for (final ValidationResult vr : results) { - Assert.assertTrue(vr.toString().contains( - " decryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both " - + EncryptContent.PRIVATE_KEYRING.getDisplayName() + " and " - + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName())); - - } - - results = new HashSet<>(); - runner.setProperty(EncryptContent.PRIVATE_KEYRING_PASSPHRASE, "PASSWORD"); - runner.enqueue(new byte[0]); - pc = (MockProcessContext) runner.getProcessContext(); - results = pc.validate(); - Assert.assertEquals(1, results.size()); - for (final ValidationResult vr : results) { - Assert.assertTrue(vr.toString().contains( - " could not be opened with the provided " + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName())); - - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashSet; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.security.util.EncryptionMethod; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +public class TestEncryptContent { + + @Test + public void testRoundTrip() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent()); + testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); + + for (final EncryptionMethod method : EncryptionMethod.values()) { + if (method.isUnlimitedStrength()) { + continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default. + } + testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name()); + testRunner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); + testRunner.assertQueueEmpty(); + + testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + testRunner.enqueue(flowFile); + testRunner.clearTransferState(); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); + + flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + } + } + + @Test + public void testDecryptSmallerThanSaltSize() { + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + runner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); + runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + runner.enqueue(new byte[4]); + runner.run(); + runner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1); + } + + @Test + public void testPGPDecrypt() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP_ASCII_ARMOR.name()); + runner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); + + runner.enqueue(Paths.get("src/test/resources/TestEncryptContent/text.txt.asc")); + runner.run(); + + runner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt")); + } + + @Test + public void testValidation() { + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + Collection<ValidationResult> results; + MockProcessContext pc; + + results = new HashSet<>(); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); + for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString() + .contains(EncryptContent.PASSWORD.getDisplayName() + " is required when using algorithm")); + } + + results = new HashSet<>(); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); + runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); + for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains( + " encryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both " + + EncryptContent.PUBLIC_KEYRING.getDisplayName() + " and " + + EncryptContent.PUBLIC_KEY_USERID.getDisplayName())); + } + + results = new HashSet<>(); + runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); + for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains("does not contain user id USERID")); + } + + runner.removeProperty(EncryptContent.PUBLIC_KEYRING); + runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID); + + results = new HashSet<>(); + runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + runner.setProperty(EncryptContent.PRIVATE_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); + for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains( + " decryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both " + + EncryptContent.PRIVATE_KEYRING.getDisplayName() + " and " + + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName())); + + } + + results = new HashSet<>(); + runner.setProperty(EncryptContent.PRIVATE_KEYRING_PASSPHRASE, "PASSWORD"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); + for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains( + " could not be opened with the provided " + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName())); + + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java index 9aa9772..11c47ce 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java @@ -1,128 +1,128 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.servlet.AsyncContext; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.http.HttpContextMap; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.stream.io.NullOutputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Assert; -import org.junit.Test; - -public class TestHandleHttpRequest { - - @Test(timeout=10000) - public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); - runner.setProperty(HandleHttpRequest.PORT, "0"); - - final MockHttpContextMap contextMap = new MockHttpContextMap(); - runner.addControllerService("http-context-map", contextMap); - runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); - - // trigger processor to stop but not shutdown. - runner.run(1, false); - try { - final Thread httpThread = new Thread(new Runnable() { - @Override - public void run() { - try { - final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); - final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" - + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); - connection.setDoOutput(false); - connection.setRequestMethod("GET"); - connection.setRequestProperty("header1", "value1"); - connection.setRequestProperty("header2", ""); - connection.setRequestProperty("header3", "apple=orange"); - connection.setConnectTimeout(3000); - connection.setReadTimeout(3000); - - StreamUtils.copy(connection.getInputStream(), new NullOutputStream()); - } catch (final Throwable t) { - t.printStackTrace(); - Assert.fail(t.toString()); - } - } - }); - httpThread.start(); - - while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) { - // process the request. - runner.run(1, false, false); - } - - runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); - assertEquals(1, contextMap.size()); - - final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0); - mff.assertAttributeEquals("http.query.param.query", "true"); - mff.assertAttributeEquals("http.query.param.value1", "value1"); - mff.assertAttributeEquals("http.query.param.value2", ""); - mff.assertAttributeEquals("http.query.param.value3", ""); - mff.assertAttributeEquals("http.query.param.value4", "apple=orange"); - mff.assertAttributeEquals("http.headers.header1", "value1"); - mff.assertAttributeEquals("http.headers.header3", "apple=orange"); - } finally { - // shut down the server - runner.run(1, true); - } - } - - private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap { - - private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>(); - - @Override - public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) { - responseMap.put(identifier, response); - return true; - } - - @Override - public HttpServletResponse getResponse(final String identifier) { - return responseMap.get(identifier); - } - - @Override - public void complete(final String identifier) { - responseMap.remove(identifier); - } - - public int size() { - return responseMap.size(); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.servlet.AsyncContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.http.HttpContextMap; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +public class TestHandleHttpRequest { + + @Test(timeout=10000) + public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); + runner.setProperty(HandleHttpRequest.PORT, "0"); + + final MockHttpContextMap contextMap = new MockHttpContextMap(); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); + + // trigger processor to stop but not shutdown. + runner.run(1, false); + try { + final Thread httpThread = new Thread(new Runnable() { + @Override + public void run() { + try { + final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); + final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" + + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); + connection.setDoOutput(false); + connection.setRequestMethod("GET"); + connection.setRequestProperty("header1", "value1"); + connection.setRequestProperty("header2", ""); + connection.setRequestProperty("header3", "apple=orange"); + connection.setConnectTimeout(3000); + connection.setReadTimeout(3000); + + StreamUtils.copy(connection.getInputStream(), new NullOutputStream()); + } catch (final Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); + } + } + }); + httpThread.start(); + + while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) { + // process the request. + runner.run(1, false, false); + } + + runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); + assertEquals(1, contextMap.size()); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0); + mff.assertAttributeEquals("http.query.param.query", "true"); + mff.assertAttributeEquals("http.query.param.value1", "value1"); + mff.assertAttributeEquals("http.query.param.value2", ""); + mff.assertAttributeEquals("http.query.param.value3", ""); + mff.assertAttributeEquals("http.query.param.value4", "apple=orange"); + mff.assertAttributeEquals("http.headers.header1", "value1"); + mff.assertAttributeEquals("http.headers.header3", "apple=orange"); + } finally { + // shut down the server + runner.run(1, true); + } + } + + private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap { + + private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>(); + + @Override + public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) { + responseMap.put(identifier, response); + return true; + } + + @Override + public HttpServletResponse getResponse(final String identifier) { + return responseMap.get(identifier); + } + + @Override + public void complete(final String identifier) { + responseMap.remove(identifier); + } + + public int size() { + return responseMap.size(); + } + } +}
