Repository: nifi Updated Branches: refs/heads/master dbe8ff3f4 -> ef80549d6
NIFI-1093 added support for handling non-required columns Mostly based on contribution from Daniel Cave Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01268002 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01268002 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01268002 Branch: refs/heads/master Commit: 012680020173a2e3db2df2f7a1cb633bf356c2f0 Parents: 73c0637 Author: Oleg Zhurakousky <[email protected]> Authored: Fri Jan 29 13:41:27 2016 -0500 Committer: Oleg Zhurakousky <[email protected]> Committed: Fri Jan 29 13:41:27 2016 -0500 ---------------------------------------------------------------------- .../processors/standard/ConvertJSONToSQL.java | 47 ++++- .../standard/TestConvertJSONToSQL.java | 204 +++++++++++++++++++ 2 files changed, 245 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 4125eeb..f8306a2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -100,6 +100,15 @@ public class ConvertJSONToSQL extends AbstractProcessor { "Any field in the JSON document that cannot be mapped to a column in the database is ignored"); static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail", "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship"); + static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", + "Ignore Unmatched Columns", + "Any column in the database that does not have a field in the JSON document will be assumed to not be required. No notification will be logged"); + static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns", + "Warning Unmatched Columns", + "Any column in the database that does not have a field in the JSON document will be assumed to not be required. A warning will be logged"); + static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns", + "Fail on Unmatched Columns", + "A flow will fail if any column in the database that does not have a field in the JSON document. An error will be logged"); static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() .name("JDBC Connection Pool") @@ -159,6 +168,12 @@ public class ConvertJSONToSQL extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder() + .name("Unmatched Column Behavior") + .description("If an incoming JSON element does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation") + .allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN ,FAIL_UNMATCHED_COLUMN) + .defaultValue(FAIL_UNMATCHED_COLUMN.getValue()) + .build(); static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") @@ -193,6 +208,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { properties.add(SCHEMA_NAME); properties.add(TRANSLATE_FIELD_NAMES); properties.add(UNMATCHED_FIELD_BEHAVIOR); + properties.add(UNMATCHED_COLUMN_BEHAVIOR); properties.add(UPDATE_KEY); return properties; } @@ -233,6 +249,10 @@ public class ConvertJSONToSQL extends AbstractProcessor { final SchemaKey schemaKey = new SchemaKey(catalog, tableName); final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; + // Is the unmatched column behaviour fail or warning? + final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()); + final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()); + // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the @@ -309,9 +329,9 @@ public class ConvertJSONToSQL extends AbstractProcessor { final String fqTableName = tableNameBuilder.toString(); if (INSERT_TYPE.equals(statementType)) { - sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields); + sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns); } else { - sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields); + sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns); } } catch (final ProcessException pe) { getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure", @@ -359,13 +379,20 @@ public class ConvertJSONToSQL extends AbstractProcessor { } private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, - final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) { + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, + final boolean warningUnmappedColumns) { final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames); for (final String requiredColName : schema.getRequiredColumnNames()) { final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames); if (!normalizedFieldNames.contains(normalizedColName)) { - throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'"); + String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'"; + if (failUnmappedColumns) { + getLogger().error(missingColMessage); + throw new ProcessException(missingColMessage); + } else if (warningUnmappedColumns) { + getLogger().warn(missingColMessage); + } } } @@ -426,7 +453,8 @@ public class ConvertJSONToSQL extends AbstractProcessor { } private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys, - final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) { + final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, + final boolean warningUnmappedColumns) { final Set<String> updateKeyNames; if (updateKeys == null) { @@ -456,7 +484,14 @@ public class ConvertJSONToSQL extends AbstractProcessor { normalizedUpdateNames.add(normalizedUK); if (!normalizedFieldNames.contains(normalizedUK)) { - throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"); + String missingColMessage = "JSON does not have a value for the " + + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'"; + if (failUnmappedColumns) { + getLogger().error(missingColMessage); + throw new ProcessException(missingColMessage); + } else if (warningUnmappedColumns) { + getLogger().warn(missingColMessage); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index 7422dbc..f4d5b96 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -400,6 +400,210 @@ public class TestConvertJSONToSQL { runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); } + @Test + public void testInsertWithMissingColumnFail() + throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } // End testInsertWithMissingColumnFail() + + @Test + public void testInsertWithMissingColumnWarning() + throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } // End testInsertWithMissingColumnWarning() + + @Test + public void testInsertWithMissingColumnIgnore() + throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate( + "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + } // End testInsertWithMissingColumnIgnore() + + @Test + public void testUpdateWithMissingColumnFail() + throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1); + } // End testUpdateWithMissingColumnFail() + + @Test + public void testUpdateWithMissingColumnWarning() + throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); + + } // End testUpdateWithMissingColumnWarning() + + @Test + public void testUpdateWithMissingColumnIgnore() + throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersons); + } + } + + runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); + runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra"); + runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns"); + runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); + runner.run(); + + runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); + out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.1.value", "1"); + out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR)); + out.assertAttributeEquals("sql.args.2.value", "Mark"); + out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); + out.assertAttributeEquals("sql.args.3.value", "48"); + + out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?"); + + } // End testUpdateWithMissingColumnIgnore() + /** * Simple implementation only for testing purposes
