Repository: nifi Updated Branches: refs/heads/master 40acd4a6e -> c4d0c0bbd
NIFI-3626: Add support for DELETE in ConvertJSONToSQL This closes #1605. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c4d0c0bb Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c4d0c0bb Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c4d0c0bb Branch: refs/heads/master Commit: c4d0c0bbd18759100641d0cbca0b584f8f20a813 Parents: 40acd4a Author: Matt Burgess <[email protected]> Authored: Mon Mar 20 15:05:46 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Tue Mar 21 17:20:27 2017 -0400 ---------------------------------------------------------------------- .../processors/standard/ConvertJSONToSQL.java | 90 +++++++++++- .../standard/TestConvertJSONToSQL.java | 144 +++++++++++++++++++ 2 files changed, 230 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c4d0c0bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 4d8f462..2eb9cad 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -74,8 +74,8 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu @SupportsBatching @SeeAlso(PutSQL.class) @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"}) -@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be " +@Tags({"json", "sql", "database", "rdbms", "insert", "update", "delete", "relational", "flat"}) +@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE, INSERT, or DELETE SQL statement. The incoming FlowFile is expected to be " + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to " + "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is " + "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' " @@ -102,6 +102,7 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu public class ConvertJSONToSQL extends AbstractProcessor { private static final String UPDATE_TYPE = "UPDATE"; private static final String INSERT_TYPE = "INSERT"; + private static final String DELETE_TYPE = "DELETE"; static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields", "Any field in the JSON document that cannot be mapped to a column in the database is ignored"); @@ -128,7 +129,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { .name("Statement Type") .description("Specifies the type of SQL Statement to generate") .required(true) - .allowableValues(UPDATE_TYPE, INSERT_TYPE) + .allowableValues(UPDATE_TYPE, INSERT_TYPE, DELETE_TYPE) .build(); static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() .name("Table Name") @@ -363,9 +364,12 @@ public class ConvertJSONToSQL extends AbstractProcessor { if (INSERT_TYPE.equals(statementType)) { sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); - } else { + } else if (UPDATE_TYPE.equals(statementType)) { sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); + } else { + sql = generateDelete(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, + failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName); } } catch (final ProcessException pe) { getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", @@ -650,6 +654,84 @@ public class ConvertJSONToSQL extends AbstractProcessor { return sqlBuilder.toString(); } + private String generateDelete(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, + final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { + final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); + for (final String requiredColName : schema.getRequiredColumnNames()) { + final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); + if (!normalizedFieldNames.contains(normalizedColName)) { + String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'"; + if (failUnmappedColumns) { + getLogger().error(missingColMessage); + throw new ProcessException(missingColMessage); + } else if (warningUnmappedColumns) { + getLogger().warn(missingColMessage); + } + } + } + + final StringBuilder sqlBuilder = new StringBuilder(); + int fieldCount = 0; + sqlBuilder.append("DELETE FROM "); + if (quoteTableName) { + sqlBuilder.append(schema.getQuotedIdentifierString()) + .append(tableName) + .append(schema.getQuotedIdentifierString()); + } else { + sqlBuilder.append(tableName); + } + + sqlBuilder.append(" WHERE "); + + // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as + // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the + // columns that we are inserting into + final Iterator<String> fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + + final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames)); + if (desc == null && !ignoreUnmappedFields) { + throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database"); + } + + if (desc != null) { + if (fieldCount++ > 0) { + sqlBuilder.append(" AND "); + } + + if (escapeColumnNames) { + sqlBuilder.append(schema.getQuotedIdentifierString()) + .append(desc.getColumnName()) + .append(schema.getQuotedIdentifierString()); + } else { + sqlBuilder.append(desc.getColumnName()); + } + sqlBuilder.append(" = ?"); + + final int sqlType = desc.getDataType(); + attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType)); + + final Integer colSize = desc.getColumnSize(); + final JsonNode fieldNode = rootNode.get(fieldName); + if (!fieldNode.isNull()) { + String fieldValue = fieldNode.asText(); + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + attributes.put("sql.args." + fieldCount + ".value", fieldValue); + } + } + } + + if (fieldCount == 0) { + throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table"); + } + + return sqlBuilder.toString(); + } + private static String normalizeColumnName(final String colName, final boolean translateColumnNames) { return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c4d0c0bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index cf1459c..bc9d7f9 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -833,6 +833,150 @@ public class TestConvertJSONToSQL { } // End testUpdateWithMissingColumnIgnore() + @Test + public void testDelete() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?"); + } + + @Test + public void testDeleteQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE"); + runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true"); + + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("DELETE FROM PERSONS WHERE \"ID\" = ? AND \"NAME\" = ? AND \"CODE\" = ?"); + } + + @Test + public void testDeleteQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE"); + runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true"); + + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("DELETE FROM \"PERSONS\" WHERE ID = ? AND NAME = ? AND CODE = ?"); + } + + @Test + public void testDeleteWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeNotExists("sql.args.3.value"); + + out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?"); + } + /** * Simple implementation only for testing purposes */
