Repository: nifi Updated Branches: refs/heads/master 16dc5d5fd -> 28549c2b1
http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java deleted file mode 100644 index d349a58..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.Date; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.stream.io.StreamUtils; - -@SupportsBatching -@SeeAlso(ConvertFlatJSONToSQL.class) -@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) -@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " - + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " - + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " - + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") -public class PutSQL extends AbstractProcessor { - - static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() - .name("JDBC Connection Pool") - .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " - + "The Connection Pool is necessary in order to determine the appropriate database column types.") - .identifiesControllerService(DBCPService.class) - .required(true) - .build(); - - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after the database is successfully updated") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if the database cannot be updated for any reason") - .build(); - - private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); - private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(CONNECTION_POOL); - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - // TODO: Batch. Pull in 50 or 100 at a time and map content of FlowFile to Set<FlowFile> that have that - // same content. Then execute updates in batches. - - final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); - try (final Connection conn = dbcpService.getConnection()) { - // Read the SQL from the FlowFile's content - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); - - final String sql = new String(buffer, StandardCharsets.UTF_8); - - // Create a prepared statement and set the appropriate parameters on the statement. - try (final PreparedStatement stmt = conn.prepareStatement(sql)) { - for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) { - final String key = entry.getKey(); - final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); - if (matcher.matches()) { - final int parameterIndex = Integer.parseInt(matcher.group(1)); - - final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); - if (!isNumeric) { - getLogger().error("Cannot update database for {} because the value of the '{}' attribute is '{}', which is not a valid JDBC numeral type; routing to failure", - new Object[] {flowFile, key, entry.getValue()}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final int jdbcType = Integer.parseInt(entry.getValue()); - final String valueAttrName = "sql.args." + parameterIndex + ".value"; - final String parameterValue = flowFile.getAttribute(valueAttrName); - if (parameterValue == null) { - getLogger().error("Cannot update database for {} because the '{}' attribute exists but the '{}' attribute does not", new Object[] {flowFile, key, valueAttrName}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - try { - setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); - } catch (final NumberFormatException nfe) { - getLogger().error("Cannot update database for {} because the '{}' attribute has a value of '{}', " - + "which cannot be converted into the necessary data type; routing to failure", new Object[] {flowFile, valueAttrName, parameterValue}); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - } - - final int updatedRowCount = stmt.executeUpdate(); - flowFile = session.putAttribute(flowFile, "sql.update.count", String.valueOf(updatedRowCount)); - } - - // TODO: Need to expose Connection URL from DBCP Service and use it to emit a Provenance Event - session.transfer(flowFile, REL_SUCCESS); - } catch (final SQLException e) { - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - - private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { - switch (jdbcType) { - case Types.BIT: - stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); - break; - case Types.TINYINT: - stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); - break; - case Types.SMALLINT: - stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); - break; - case Types.INTEGER: - stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); - break; - case Types.BIGINT: - stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); - break; - case Types.REAL: - stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); - break; - case Types.FLOAT: - case Types.DOUBLE: - stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); - break; - case Types.DATE: - stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); - break; - case Types.TIME: - stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); - break; - case Types.TIMESTAMP: - stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); - break; - case Types.CHAR: - case Types.VARCHAR: - case Types.LONGNVARCHAR: - case Types.LONGVARCHAR: - stmt.setString(parameterIndex, parameterValue); - break; - default: - throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue - + "' and a type of '" + jdbcType + "' but this is not a known data type"); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java deleted file mode 100644 index b2bdbba..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertFlatJSONToSQL.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestConvertFlatJSONToSQL { - static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); - } - - - @Test - public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.1.value", "Mark"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.2.value", "48"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "1"); - - out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); - } - - - @Test - public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); - } - - @Test - public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-1.json")); - runner.run(); - - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_ORIGINAL, 1); - runner.assertTransferCount(ConvertFlatJSONToSQL.REL_SQL, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertFlatJSONToSQL.REL_SQL).get(0); - out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.1.value", "1"); - out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); - out.assertAttributeEquals("sql.args.2.value", "Mark"); - out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); - out.assertAttributeEquals("sql.args.3.value", "48"); - - out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); - } - - @Test - public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); - } - - @Test - public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "UPDATE"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); - } - - @Test - public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertFlatJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(ConvertFlatJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertFlatJSONToSQL.TABLE_NAME, "PERSONS"); - runner.setProperty(ConvertFlatJSONToSQL.STATEMENT_TYPE, "INSERT"); - runner.setProperty(ConvertFlatJSONToSQL.UPDATE_KEY, "name, code"); - runner.enqueue(Paths.get("src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json")); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertFlatJSONToSQL.REL_FAILURE, 1); - } - - - /** - * Simple implementation only for testing purposes - */ - private static class MockDBCPService extends AbstractControllerService implements DBCPService { - private final String dbLocation; - - public MockDBCPService(final String dbLocation) { - this.dbLocation = dbLocation; - } - - @Override - public String getIdentifier() { - return "dbcp"; - } - - @Override - public Connection getConnection() throws ProcessException { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); - return con; - } catch (final Exception e) { - e.printStackTrace(); - throw new ProcessException("getConnection failed: " + e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java deleted file mode 100644 index ccba3e1..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestPutSQL { - static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - - runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); - runner.run(); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("George", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - - runner.clearTransferState(); - - attributes.clear(); - attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.1.value", "George"); - - attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.2.value", "1"); - - runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); - runner.run(); - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("George", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - - final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.4.value", "1"); - - runner.enqueue(sql.getBytes(), attributes); - runner.run(); - - // should fail because of the semicolon - runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertFalse(rs.next()); - } - } - } - - - /** - * Simple implementation only for testing purposes - */ - private static class MockDBCPService extends AbstractControllerService implements DBCPService { - private final String dbLocation; - - public MockDBCPService(final String dbLocation) { - this.dbLocation = dbLocation; - } - - @Override - public String getIdentifier() { - return "dbcp"; - } - - @Override - public Connection getConnection() throws ProcessException { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); - return con; - } catch (final Exception e) { - e.printStackTrace(); - throw new ProcessException("getConnection failed: " + e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json deleted file mode 100644 index 2f82a94..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/malformed-person-extra-comma.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "id": 1, - "name": "Mark", -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json deleted file mode 100644 index 8f98447..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-1.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "id": 1, - "name": "Mark", - "code": 48 -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json deleted file mode 100644 index 3ff074a..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-code.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "id": 1, - "name": "Mark" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json deleted file mode 100644 index 347cc9e..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertFlatJSONToSQL/person-without-id.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "name": "Mark", - "code": 48 -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7cbc76c..af1c8fb 100644 --- a/pom.xml +++ b/pom.xml @@ -670,6 +670,11 @@ <artifactId>jasypt</artifactId> <version>1.9.2</version> </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId>
