Repository: nifi Updated Branches: refs/heads/NIFI-1054 [created] e2d3d1b76
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 17506f7..301ad75 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -1,700 +1,700 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.mockito.Mockito; - -public class TestPutSQL { - private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; - private static final String createPersonsAutoId = "CREATE TABLE PERSONS (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))"; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - - runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); - runner.run(); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("George", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersonsAutoId); - } - } - - runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true"); - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - final MockFlowFile mff = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0); - mff.assertAttributeEquals("sql.generated.key", "1"); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testFailInMiddleWithBadStatement() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersonsAutoId); - } - } - - runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); - runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax - runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); - runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); - runner.run(); - - runner.assertTransferCount(PutSQL.REL_FAILURE, 1); - runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); - } - - - @Test - public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersonsAutoId); - } - } - - runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - - final Map<String, String> goodAttributes = new HashMap<>(); - goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - goodAttributes.put("sql.args.1.value", "84"); - - final Map<String, String> badAttributes = new HashMap<>(); - badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); - badAttributes.put("sql.args.1.value", "hello"); - - final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, badAttributes); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, goodAttributes); - runner.run(); - - runner.assertTransferCount(PutSQL.REL_FAILURE, 1); - runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); - } - - - @Test - public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersonsAutoId); - } - } - - runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - - final Map<String, String> goodAttributes = new HashMap<>(); - goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - goodAttributes.put("sql.args.1.value", "84"); - - final Map<String, String> badAttributes = new HashMap<>(); - badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - badAttributes.put("sql.args.1.value", "9999"); - - final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, badAttributes); - runner.enqueue(data, goodAttributes); - runner.enqueue(data, goodAttributes); - runner.run(); - - runner.assertTransferCount(PutSQL.REL_SUCCESS, 1); - runner.assertTransferCount(PutSQL.REL_FAILURE, 1); - runner.assertTransferCount(PutSQL.REL_RETRY, 2); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary key, name varchar(100), code bigint)"); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", "-5"); - attributes.put("sql.args.1.value", "84"); - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - @Test - public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - - runner.clearTransferState(); - - attributes.clear(); - attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.1.value", "George"); - - attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.2.value", "1"); - - runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); - runner.run(); - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("George", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - - final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.4.value", "1"); - - runner.enqueue(sql.getBytes(), attributes); - runner.run(); - - // should fail because of the semicolon - runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Mark", rs.getString(2)); - assertEquals(0, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - @Test - public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - - final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.4.value", "1"); - - runner.enqueue(sql.getBytes(), attributes); - runner.run(); - - // should fail because of the semicolon - runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final DBCPService service = new SQLExceptionService(null); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - - final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + - "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.4.value", "1"); - - runner.enqueue(sql.getBytes(), attributes); - runner.run(); - - // should fail because of the semicolon - runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1); - } - - - @Test - public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(PutSQL.BATCH_SIZE, "1"); - - final Map<String, String> attributes = new HashMap<>(); - attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.1.value", "1"); - - attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); - attributes.put("sql.args.2.value", "Mark"); - - attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); - attributes.put("sql.args.3.value", "84"); - - attributes.put("fragment.identifier", "1"); - attributes.put("fragment.count", "2"); - attributes.put("fragment.index", "0"); - runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); - runner.run(); - - // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier - runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0); - - attributes.clear(); - attributes.put("fragment.identifier", "1"); - attributes.put("fragment.count", "2"); - attributes.put("fragment.index", "1"); - - runner.clearTransferState(); - runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes); - runner.run(); - - // Both FlowFiles with fragment identifier 1 should be successful - runner.assertTransferCount(PutSQL.REL_SUCCESS, 2); - runner.assertTransferCount(PutSQL.REL_FAILURE, 0); - runner.assertTransferCount(PutSQL.REL_RETRY, 0); - for (final MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) { - mff.assertAttributeEquals("fragment.identifier", "1"); - } - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("Leonard", rs.getString(2)); - assertEquals(84, rs.getInt(3)); - assertFalse(rs.next()); - } - } - } - - - @Test - public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - - runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs"); - runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); - final Map<String, String> attributes = new HashMap<>(); - attributes.put("fragment.identifier", "1"); - attributes.put("fragment.count", "2"); - attributes.put("fragment.index", "0"); - - final MockFlowFile mff = new MockFlowFile(0L) { - @Override - public Long getLastQueueDate() { - return System.currentTimeMillis() - 10000L; // return 10 seconds ago - } - - @Override - public Map<String, String> getAttributes() { - return attributes; - } - - @Override - public String getAttribute(final String attrName) { - return attributes.get(attrName); - } - }; - - runner.enqueue(mff); - runner.run(); - - // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier - runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); - } - - /** - * Simple implementation only for testing purposes - */ - private static class MockDBCPService extends AbstractControllerService implements DBCPService { - private final String dbLocation; - - public MockDBCPService(final String dbLocation) { - this.dbLocation = dbLocation; - } - - @Override - public String getIdentifier() { - return "dbcp"; - } - - @Override - public Connection getConnection() throws ProcessException { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); - return conn; - } catch (final Exception e) { - e.printStackTrace(); - throw new ProcessException("getConnection failed: " + e); - } - } - } - - /** - * Simple implementation only for testing purposes - */ - private static class SQLExceptionService extends AbstractControllerService implements DBCPService { - private final DBCPService service; - private int allowedBeforeFailure = 0; - private int successful = 0; - - public SQLExceptionService(final DBCPService service) { - this.service = service; - } - - @Override - public String getIdentifier() { - return "dbcp"; - } - - @Override - public Connection getConnection() throws ProcessException { - try { - if (++successful > allowedBeforeFailure) { - final Connection conn = Mockito.mock(Connection.class); - Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); - return conn; - } else { - return service.getConnection(); - } - } catch (final Exception e) { - e.printStackTrace(); - throw new ProcessException("getConnection failed: " + e); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +public class TestPutSQL { + private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + private static final String createPersonsAutoId = "CREATE TABLE PERSONS (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); + runner.run(); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0); + mff.assertAttributeEquals("sql.generated.key", "1"); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testFailInMiddleWithBadStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + badAttributes.put("sql.args.1.value", "hello"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("sql.args.1.value", "9999"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_SUCCESS, 1); + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_RETRY, 2); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary key, name varchar(100), code bigint)"); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", "-5"); + attributes.put("sql.args.1.value", "84"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.clearTransferState(); + + attributes.clear(); + attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.1.value", "George"); + + attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.2.value", "1"); + + runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(0, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final DBCPService service = new SQLExceptionService(null); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1); + } + + + @Test + public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(PutSQL.BATCH_SIZE, "1"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0); + + attributes.clear(); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "1"); + + runner.clearTransferState(); + runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes); + runner.run(); + + // Both FlowFiles with fragment identifier 1 should be successful + runner.assertTransferCount(PutSQL.REL_SUCCESS, 2); + runner.assertTransferCount(PutSQL.REL_FAILURE, 0); + runner.assertTransferCount(PutSQL.REL_RETRY, 0); + for (final MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) { + mff.assertAttributeEquals("fragment.identifier", "1"); + } + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Leonard", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + + final MockFlowFile mff = new MockFlowFile(0L) { + @Override + public Long getLastQueueDate() { + return System.currentTimeMillis() - 10000L; // return 10 seconds ago + } + + @Override + public Map<String, String> getAttributes() { + return attributes; + } + + @Override + public String getAttribute(final String attrName) { + return attributes.get(attrName); + } + }; + + runner.enqueue(mff); + runner.run(); + + // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + } + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return conn; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } + + /** + * Simple implementation only for testing purposes + */ + private static class SQLExceptionService extends AbstractControllerService implements DBCPService { + private final DBCPService service; + private int allowedBeforeFailure = 0; + private int successful = 0; + + public SQLExceptionService(final DBCPService service) { + this.service = service; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + if (++successful > allowedBeforeFailure) { + final Connection conn = Mockito.mock(Connection.class); + Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); + return conn; + } else { + return service.getConnection(); + } + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } +}
