http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java new file mode 100644 index 0000000..63073e6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestConvertJSONToSQL { + static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + + @Test + public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.3.value"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } + + + @Test + public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.2.value"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + + @Test + public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5); + final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL); + for (final MockFlowFile mff : mffs) { + mff.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + + for (int i=1; i <= 3; i++) { + mff.assertAttributeExists("sql.args." + i + ".type"); + mff.assertAttributeExists("sql.args." + i + ".value"); + } + } + } + + @Test + public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.1.value", "Mark"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.2.value", "48"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "1"); + + out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?"); + } + + @Test + public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.IGNORE_UNMATCHED_FIELD.getValue()); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + + runner.clearTransferState(); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_FIELD_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_FIELD.getValue()); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json")); + runner.run(); + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?"); + } + + @Test + public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); + } + + @Test + public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-code.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + @Test + public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-without-id.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } + + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return con; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java new file mode 100644 index 0000000..a348c9e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -0,0 +1,664 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +public class TestPutSQL { + private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + private static final String createPersonsAutoId = "CREATE TABLE PERSONS (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes()); + runner.run(); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0); + mff.assertAttributeEquals("sql.generated.key", "1"); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testFailInMiddleWithBadStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + badAttributes.put("sql.args.1.value", "hello"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + } + + + @Test + public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("sql.args.1.value", "9999"); + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutSQL.REL_SUCCESS, 1); + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_RETRY, 2); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + + @Test + public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.clearTransferState(); + + attributes.clear(); + attributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.1.value", "George"); + + attributes.put("sql.args.2.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.2.value", "1"); + + runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(0, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final DBCPService service = new SQLExceptionService(null); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + runner.run(); + + // should fail because of the semicolon + runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1); + } + + + @Test + public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(PutSQL.BATCH_SIZE, "1"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.1.value", "1"); + + attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("sql.args.2.value", "Mark"); + + attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("sql.args.3.value", "84"); + + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes); + runner.run(); + + // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0); + + attributes.clear(); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "1"); + + runner.clearTransferState(); + runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes); + runner.run(); + + // Both FlowFiles with fragment identifier 1 should be successful + runner.assertTransferCount(PutSQL.REL_SUCCESS, 2); + runner.assertTransferCount(PutSQL.REL_FAILURE, 0); + runner.assertTransferCount(PutSQL.REL_RETRY, 0); + for (final MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) { + mff.assertAttributeEquals("fragment.identifier", "1"); + } + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Leonard", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + + + @Test + public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs"); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + final Map<String, String> attributes = new HashMap<>(); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "2"); + attributes.put("fragment.index", "0"); + + final MockFlowFile mff = new MockFlowFile(0L) { + @Override + public Long getLastQueueDate() { + return System.currentTimeMillis() - 10000L; // return 10 seconds ago + } + + @Override + public Map<String, String> getAttributes() { + return attributes; + } + + @Override + public String getAttribute(final String attrName) { + return attributes.get(attrName); + } + }; + + runner.enqueue(mff); + runner.run(); + + // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier + runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); + } + + /** + * Simple implementation only for testing purposes + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + private final String dbLocation; + + public MockDBCPService(final String dbLocation) { + this.dbLocation = dbLocation; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true"); + return conn; + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } + + /** + * Simple implementation only for testing purposes + */ + private static class SQLExceptionService extends AbstractControllerService implements DBCPService { + private final DBCPService service; + private int allowedBeforeFailure = 0; + private int successful = 0; + + public SQLExceptionService(final DBCPService service) { + this.service = service; + } + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + if (++successful > allowedBeforeFailure) { + final Connection conn = Mockito.mock(Connection.class); + Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException")); + return conn; + } else { + return service.getConnection(); + } + } catch (final Exception e) { + e.printStackTrace(); + throw new ProcessException("getConnection failed: " + e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json new file mode 100644 index 0000000..2f82a94 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json @@ -0,0 +1,4 @@ +{ + "id": 1, + "name": "Mark", +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json new file mode 100644 index 0000000..8f98447 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-1.json @@ -0,0 +1,5 @@ +{ + "id": 1, + "name": "Mark", + "code": 48 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json new file mode 100644 index 0000000..3ebd587 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-extra-field.json @@ -0,0 +1,6 @@ +{ + "id": 1, + "name": "Mark", + "code": 48, + "extra": "another" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json new file mode 100644 index 0000000..0f491ce --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-with-null-code.json @@ -0,0 +1,5 @@ +{ + "id": 1, + "name": "Mark", + "code": null +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json new file mode 100644 index 0000000..3ff074a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-code.json @@ -0,0 +1,4 @@ +{ + "id": 1, + "name": "Mark" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json new file mode 100644 index 0000000..347cc9e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/person-without-id.json @@ -0,0 +1,4 @@ +{ + "name": "Mark", + "code": 48 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json new file mode 100644 index 0000000..573b698 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertJSONToSQL/persons.json @@ -0,0 +1,21 @@ +[{ + "id": 1, + "name": "Mark", + "code": 48 +}, { + "id": 2, + "name": "George", + "code": 48 +}, { + "id": 3, + "name": "Harry", + "code": 21 +}, { + "id": 4, + "name": "Julie", + "code": 48 +}, { + "id": 82, + "name": "Frank Henry", + "code": 16 +}] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/28549c2b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java deleted file mode 100644 index 57d855f..0000000 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertFlatJSONToSQL.java +++ /dev/null @@ -1,600 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.ObjectHolder; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; - - -@SideEffectFree -@SupportsBatching -@SeeAlso(PutSQL.class) -@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"}) -@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be " - + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to " - + "a JSON object, that JSON object will be interpreted as Text. Upon successful conversion, the original FlowFile is routed to the 'original' " - + "relationship and the SQL is routed to the 'sql' relationship.") -public class ConvertFlatJSONToSQL extends AbstractProcessor { - private static final String UPDATE_TYPE = "UPDATE"; - private static final String INSERT_TYPE = "INSERT"; - - static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() - .name("JDBC Connection Pool") - .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " - + "The Connection Pool is necessary in order to determine the appropriate database column types.") - .identifiesControllerService(DBCPService.class) - .required(true) - .build(); - static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder() - .name("Statement Type") - .description("Specifies the type of SQL Statement to generate") - .required(true) - .allowableValues(UPDATE_TYPE, INSERT_TYPE) - .build(); - static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() - .name("Table Name") - .description("The name of the table that the statement should update") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() - .name("Catalog Name") - .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder() - .name("Translate Field Names") - .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. " - + "If false, the JSON field names must match the column names exactly, or the column will not be updated") - .allowableValues("true", "false") - .defaultValue("true") - .build(); - static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder() - .name("Update Keys") - .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. " - + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. " - + "In this case, if no Primary Key exists, the conversion to SQL will fail. " - + "This property is ignored if the Statement Type is INSERT") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .expressionLanguageSupported(true) - .build(); - - - static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") - .build(); - static final Relationship REL_SQL = new Relationship.Builder() - .name("sql") - .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON " - + "content or the JSON content missing a required field (if using an INSERT statement type).") - .build(); - - private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) { - private static final long serialVersionUID = 1L; - - @Override - protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) { - return true; - } - }; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(CONNECTION_POOL); - properties.add(STATEMENT_TYPE); - properties.add(TABLE_NAME); - properties.add(CATALOG_NAME); - properties.add(TRANSLATE_FIELD_NAMES); - properties.add(UPDATE_KEY); - return properties; - } - - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> rels = new HashSet<>(); - rels.add(REL_ORIGINAL); - rels.add(REL_SQL); - rels.add(REL_FAILURE); - return rels; - } - - - @OnScheduled - public void onScheduled(final ProcessContext context) { - synchronized (this) { - schemaCache.clear(); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean(); - final String statementType = context.getProperty(STATEMENT_TYPE).getValue(); - final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue(); - - final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final SchemaKey schemaKey = new SchemaKey(catalog, tableName); - final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; - - // get the database schema from the cache, if one exists - TableSchema schema; - synchronized (this) { - schema = schemaCache.get(schemaKey); - if (schema == null) { - // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. - final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); - try (final Connection conn = dbcpService.getConnection()) { - schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys); - schemaCache.put(schemaKey, schema); - } catch (final SQLException e) { - getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - } - - // Parse the JSON document - final ObjectMapper mapper = new ObjectMapper(); - final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null); - try { - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - try (final InputStream bufferedIn = new BufferedInputStream(in)) { - rootNodeRef.set(mapper.readTree(bufferedIn)); - } - } - }); - } catch (final ProcessException pe) { - getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final JsonNode rootNode = rootNodeRef.get(); - final String sql; - final Map<String, String> attributes = new HashMap<>(); - if (INSERT_TYPE.equals(statementType)) { - try { - sql = generateInsert(rootNode, attributes, tableName, schema, translateFieldNames); - } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} to a SQL INSERT statement due to {}; routing to failure", - new Object[] { flowFile, pe.toString() }, pe); - session.transfer(flowFile, REL_FAILURE); - return; - } - } else { - try { - sql = generateUpdate(rootNode, attributes, tableName, updateKeys, schema, translateFieldNames); - } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} to a SQL UPDATE statement due to {}; routing to failure", - new Object[] { flowFile, pe.toString() }, pe); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - - FlowFile sqlFlowFile = session.create(flowFile); - sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(sql.getBytes(StandardCharsets.UTF_8)); - } - }); - - attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - attributes.put("sql.table", tableName); - if (catalog != null) { - attributes.put("sql.catalog", catalog); - } - - sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes); - - session.transfer(flowFile, REL_ORIGINAL); - session.transfer(sqlFlowFile, REL_SQL); - } - - private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) { - final Set<String> normalizedFieldNames = new HashSet<>(); - final Iterator<String> fieldNameItr = node.getFieldNames(); - while (fieldNameItr.hasNext()) { - normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames)); - } - - return normalizedFieldNames; - } - - private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, - final TableSchema schema, final boolean translateFieldNames) { - - final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); - for (final String requiredColName : schema.getRequiredColumnNames()) { - final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); - if (!normalizedFieldNames.contains(normalizedColName)) { - throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'"); - } - } - - final StringBuilder sqlBuilder = new StringBuilder(); - int fieldCount = 0; - sqlBuilder.append("INSERT INTO ").append(tableName).append(" ("); - - // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as - // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the - // columns that we are inserting into - final Iterator<String> fieldNames = rootNode.getFieldNames(); - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - - final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames)); - if (desc != null) { - if (fieldCount++ > 0) { - sqlBuilder.append(", "); - } - - sqlBuilder.append(desc.getColumnName()); - - final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); - - final Integer colSize = desc.getColumnSize(); - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); - } - } - - // complete the SQL statements by adding ?'s for all of the values to be escaped. - sqlBuilder.append(") VALUES ("); - for (int i=0; i < fieldCount; i++) { - if (i > 0) { - sqlBuilder.append(", "); - } - - sqlBuilder.append("?"); - } - sqlBuilder.append(")"); - - if (fieldCount == 0) { - throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table"); - } - - return sqlBuilder.toString(); - } - - private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys, - final TableSchema schema, final boolean translateFieldNames) { - - final Set<String> updateKeyNames; - if (updateKeys == null) { - updateKeyNames = schema.getPrimaryKeyColumnNames(); - } else { - updateKeyNames = new HashSet<>(); - for (final String updateKey : updateKeys.split(",")) { - updateKeyNames.add(updateKey.trim()); - } - } - - if (updateKeyNames.isEmpty()) { - throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"); - } - - final StringBuilder sqlBuilder = new StringBuilder(); - int fieldCount = 0; - sqlBuilder.append("UPDATE ").append(tableName).append(" SET "); - - - // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON - // for each of the Update Key fields. - final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); - final Set<String> normalizedUpdateNames = new HashSet<>(); - for (final String uk : updateKeyNames) { - final String normalizedUK = normalizeColumnName(uk, translateFieldNames); - normalizedUpdateNames.add(normalizedUK); - - if (!normalizedFieldNames.contains(normalizedUK)) { - throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); - } - } - - // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as - // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the - // columns that we are inserting into - Iterator<String> fieldNames = rootNode.getFieldNames(); - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - - final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); - final ColumnDescription desc = schema.getColumns().get(normalizedColName); - if (desc == null) { - continue; - } - - // Check if this column is an Update Key. If so, skip it for now. We will come - // back to it after we finish the SET clause - if (normalizedUpdateNames.contains(normalizedColName)) { - continue; - } - - if (fieldCount++ > 0) { - sqlBuilder.append(", "); - } - - sqlBuilder.append(desc.getColumnName()).append(" = ?"); - final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); - - final Integer colSize = desc.getColumnSize(); - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); - } - - // Set the WHERE clause based on the Update Key values - sqlBuilder.append(" WHERE "); - - fieldNames = rootNode.getFieldNames(); - int whereFieldCount = 0; - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - - final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames); - final ColumnDescription desc = schema.getColumns().get(normalizedColName); - if (desc == null) { - continue; - } - - // Check if this column is a Update Key. If so, skip it for now. We will come - // back to it after we finish the SET clause - if (!normalizedUpdateNames.contains(normalizedColName)) { - continue; - } - - if (whereFieldCount++ > 0) { - sqlBuilder.append(" AND "); - } - fieldCount++; - - sqlBuilder.append(normalizedColName).append(" = ?"); - final int sqlType = desc.getDataType(); - attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); - - final Integer colSize = desc.getColumnSize(); - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } - attributes.put("sql.args." + fieldCount + ".value", fieldValue); - } - - return sqlBuilder.toString(); - } - - private static String normalizeColumnName(final String colName, final boolean translateColumnNames) { - return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName; - } - - private static class TableSchema { - private List<String> requiredColumnNames; - private Set<String> primaryKeyColumnNames; - private Map<String, ColumnDescription> columns; - - private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames, - final Set<String> primaryKeyColumnNames) { - this.columns = new HashMap<>(); - this.primaryKeyColumnNames = primaryKeyColumnNames; - - this.requiredColumnNames = new ArrayList<>(); - for (final ColumnDescription desc : columnDescriptions) { - columns.put(ConvertFlatJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc); - if (desc.isRequired()) { - requiredColumnNames.add(desc.columnName); - } - } - } - - public Map<String, ColumnDescription> getColumns() { - return columns; - } - - public List<String> getRequiredColumnNames() { - return requiredColumnNames; - } - - public Set<String> getPrimaryKeyColumnNames() { - return primaryKeyColumnNames; - } - - public static TableSchema from(final Connection conn, final String catalog, final String tableName, - final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { - final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); - - final List<ColumnDescription> cols = new ArrayList<>(); - while (colrs.next()) { - final ColumnDescription col = ColumnDescription.from(colrs); - cols.add(col); - } - - final Set<String> primaryKeyColumns = new HashSet<>(); - if (includePrimaryKeys) { - final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName); - - while (pkrs.next()) { - final String colName = pkrs.getString("COLUMN_NAME"); - primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); - } - } - - return new TableSchema(cols, translateColumnNames, primaryKeyColumns); - } - } - - private static class ColumnDescription { - private final String columnName; - private final int dataType; - private final boolean required; - private final Integer columnSize; - - private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) { - this.columnName = columnName; - this.dataType = dataType; - this.required = required; - this.columnSize = columnSize; - } - - public int getDataType() { - return dataType; - } - - public Integer getColumnSize() { - return columnSize; - } - - public String getColumnName() { - return columnName; - } - - public boolean isRequired() { - return required; - } - - public static ColumnDescription from(final ResultSet resultSet) throws SQLException { - final String columnName = resultSet.getString("COLUMN_NAME"); - final int dataType = resultSet.getInt("DATA_TYPE"); - final int colSize = resultSet.getInt("COLUMN_SIZE"); - - final String nullableValue = resultSet.getString("IS_NULLABLE"); - final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty(); - final String defaultValue = resultSet.getString("COLUMN_DEF"); - final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT"); - final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue); - final boolean required = !isNullable && !isAutoIncrement && defaultValue == null; - - return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize); - } - } - - private static class SchemaKey { - private final String catalog; - private final String tableName; - - public SchemaKey(final String catalog, final String tableName) { - this.catalog = catalog; - this.tableName = tableName; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((catalog == null) ? 0 : catalog.hashCode()); - result = prime * result + ((tableName == null) ? 0 : tableName.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - - final SchemaKey other = (SchemaKey) obj; - if (catalog == null) { - if (other.catalog != null) { - return false; - } - } else if (!catalog.equals(other.catalog)) { - return false; - } - - - if (tableName == null) { - if (other.tableName != null) { - return false; - } - } else if (!tableName.equals(other.tableName)) { - return false; - } - - return true; - } - } -}
