Repository: nifi
Updated Branches:
  refs/heads/master dbe8ff3f4 -> ef80549d6


NIFI-1093 added support for handling non-required columns
Mostly based on contribution from Daniel Cave


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01268002
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01268002
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01268002

Branch: refs/heads/master
Commit: 012680020173a2e3db2df2f7a1cb633bf356c2f0
Parents: 73c0637
Author: Oleg Zhurakousky <[email protected]>
Authored: Fri Jan 29 13:41:27 2016 -0500
Committer: Oleg Zhurakousky <[email protected]>
Committed: Fri Jan 29 13:41:27 2016 -0500

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   |  47 ++++-
 .../standard/TestConvertJSONToSQL.java          | 204 +++++++++++++++++++
 2 files changed, 245 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 4125eeb..f8306a2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -100,6 +100,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         "Any field in the JSON document that cannot be mapped to a column in 
the database is ignored");
     static final AllowableValue FAIL_UNMATCHED_FIELD = new 
AllowableValue("Fail", "Fail",
         "If the JSON document has any field that cannot be mapped to a column 
in the database, the FlowFile will be routed to the failure relationship");
+    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new 
AllowableValue("Ignore Unmatched Columns",
+            "Ignore Unmatched Columns",
+            "Any column in the database that does not have a field in the JSON 
document will be assumed to not be required.  No notification will be logged");
+    static final AllowableValue WARNING_UNMATCHED_COLUMN = new 
AllowableValue("Warn on Unmatched Columns",
+            "Warning Unmatched Columns",
+            "Any column in the database that does not have a field in the JSON 
document will be assumed to not be required.  A warning will be logged");
+    static final AllowableValue FAIL_UNMATCHED_COLUMN = new 
AllowableValue("Fail on Unmatched Columns",
+            "Fail on Unmatched Columns",
+            "A flow will fail if any column in the database that does not have 
a field in the JSON document.  An error will be logged");
 
     static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
             .name("JDBC Connection Pool")
@@ -159,6 +168,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .expressionLanguageSupported(true)
             .build();
 
+    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new 
PropertyDescriptor.Builder()
+            .name("Unmatched Column Behavior")
+            .description("If an incoming JSON element does not have a field 
mapping for all of the database table's columns, this property specifies how to 
handle the situation")
+            .allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN 
,FAIL_UNMATCHED_COLUMN)
+            .defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
+            .build();
 
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
@@ -193,6 +208,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(SCHEMA_NAME);
         properties.add(TRANSLATE_FIELD_NAMES);
         properties.add(UNMATCHED_FIELD_BEHAVIOR);
+        properties.add(UNMATCHED_COLUMN_BEHAVIOR);
         properties.add(UPDATE_KEY);
         return properties;
     }
@@ -233,6 +249,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
         final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) 
&& updateKeys == null;
 
+        // Is the unmatched column behaviour fail or warning?
+        final boolean failUnmappedColumns = 
FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+        final boolean warningUnmappedColumns = 
WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
         // get the database schema from the cache, if one exists. We do this 
in a synchronized block, rather than
         // using a ConcurrentMap because the Map that we are using is a 
LinkedHashMap with a capacity such that if
         // the Map grows beyond this capacity, old elements are evicted. We do 
this in order to avoid filling the
@@ -309,9 +329,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 final String fqTableName = tableNameBuilder.toString();
 
                 if (INSERT_TYPE.equals(statementType)) {
-                    sql = generateInsert(jsonNode, attributes, fqTableName, 
schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateInsert(jsonNode, attributes, fqTableName, 
schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, 
warningUnmappedColumns);
                 } else {
-                    sql = generateUpdate(jsonNode, attributes, fqTableName, 
updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateUpdate(jsonNode, attributes, fqTableName, 
updateKeys, schema, translateFieldNames, ignoreUnmappedFields, 
failUnmappedColumns, warningUnmappedColumns);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement 
due to {}; routing to failure",
@@ -359,13 +379,20 @@ public class ConvertJSONToSQL extends AbstractProcessor {
     }
 
     private String generateInsert(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName,
-        final TableSchema schema, final boolean translateFieldNames, final 
boolean ignoreUnmappedFields) {
+        final TableSchema schema, final boolean translateFieldNames, final 
boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
+        final boolean warningUnmappedColumns) {
 
         final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
             final String normalizedColName = 
normalizeColumnName(requiredColName, translateFieldNames);
             if (!normalizedFieldNames.contains(normalizedColName)) {
-                throw new ProcessException("JSON does not have a value for the 
Required column '" + requiredColName + "'");
+                String missingColMessage = "JSON does not have a value for the 
Required column '" + requiredColName + "'";
+                if (failUnmappedColumns) {
+                    getLogger().error(missingColMessage);
+                    throw new ProcessException(missingColMessage);
+                } else if (warningUnmappedColumns) {
+                    getLogger().warn(missingColMessage);
+                }
             }
         }
 
@@ -426,7 +453,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
     }
 
     private String generateUpdate(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName, final String updateKeys,
-        final TableSchema schema, final boolean translateFieldNames, final 
boolean ignoreUnmappedFields) {
+        final TableSchema schema, final boolean translateFieldNames, final 
boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
+        final boolean warningUnmappedColumns) {
 
         final Set<String> updateKeyNames;
         if (updateKeys == null) {
@@ -456,7 +484,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             normalizedUpdateNames.add(normalizedUK);
 
             if (!normalizedFieldNames.contains(normalizedUK)) {
-                throw new ProcessException("JSON does not have a value for the 
" + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+                String missingColMessage = "JSON does not have a value for the 
"
+                        + (updateKeys == null ? "Primary" : "Update") + "Key 
column '" + uk + "'";
+                if (failUnmappedColumns) {
+                    getLogger().error(missingColMessage);
+                    throw new ProcessException(missingColMessage);
+                } else if (warningUnmappedColumns) {
+                    getLogger().warn(missingColMessage);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 7422dbc..f4d5b96 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -400,6 +400,210 @@ public class TestConvertJSONToSQL {
         runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testInsertWithMissingColumnFail()
+            throws InitializationException, ProcessException, SQLException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(
+                        "CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail 
on Unmatched Columns");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    } // End testInsertWithMissingColumnFail()
+
+    @Test
+    public void testInsertWithMissingColumnWarning()
+            throws InitializationException, ProcessException, SQLException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(
+                        "CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn 
on Unmatched Columns");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+    } // End testInsertWithMissingColumnWarning()
+
+    @Test
+    public void testInsertWithMissingColumnIgnore()
+            throws InitializationException, ProcessException, SQLException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(
+                        "CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore 
Unmatched Columns");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+    } // End testInsertWithMissingColumnIgnore()
+
+    @Test
+    public void testUpdateWithMissingColumnFail()
+            throws InitializationException, ProcessException, SQLException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail 
on Unmatched Columns");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    } // End testUpdateWithMissingColumnFail()
+
+    @Test
+    public void testUpdateWithMissingColumnWarning()
+            throws InitializationException, ProcessException, SQLException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn 
on Unmatched Columns");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND 
CODE = ?");
+
+    } // End testUpdateWithMissingColumnWarning()
+
+    @Test
+    public void testUpdateWithMissingColumnIgnore()
+            throws InitializationException, ProcessException, SQLException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore 
Unmatched Columns");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND 
CODE = ?");
+
+    } // End testUpdateWithMissingColumnIgnore()
+
 
     /**
      * Simple implementation only for testing purposes

Reply via email to