Repository: nifi
Updated Branches:
  refs/heads/master 027fbf48b -> 46fa6a167


NIFI-3429 Quoting table identifier behind property

This closes #1464.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/46fa6a16
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/46fa6a16
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/46fa6a16

Branch: refs/heads/master
Commit: 46fa6a167ade701cd9dfa83180615226b2f182da
Parents: 027fbf4
Author: Nick Carenza <[email protected]>
Authored: Wed Feb 1 17:50:34 2017 -0800
Committer: Pierre Villard <[email protected]>
Committed: Tue Feb 7 16:48:38 2017 +0100

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   | 64 ++++++++++++-----
 .../standard/TestConvertJSONToSQL.java          | 74 ++++++++++++++++++++
 2 files changed, 121 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/46fa6a16/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 1ce6fb5..4d8f462 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -183,13 +183,22 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     static final PropertyDescriptor QUOTED_IDENTIFIERS = new 
PropertyDescriptor.Builder()
             .name("jts-quoted-identifiers")
-            .displayName("Quote Identifiers")
+            .displayName("Quote Column Identifiers")
             .description("Enabling this option will cause all column names to 
be quoted, allowing you to "
                     + "use reserved words as column names in your tables.")
             .allowableValues("true", "false")
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor QUOTED_TABLE_IDENTIFIER = new 
PropertyDescriptor.Builder()
+            .name("jts-quoted-table-identifiers")
+            .displayName("Quote Table Identifiers")
+            .description("Enabling this option will cause the table name to be 
quoted to support the "
+                    + "use of special characters in the table name")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
             .description("When a FlowFile is converted to SQL, the original 
JSON FlowFile is routed to this relationship")
@@ -226,6 +235,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(UNMATCHED_COLUMN_BEHAVIOR);
         properties.add(UPDATE_KEY);
         properties.add(QUOTED_IDENTIFIERS);
+        properties.add(QUOTED_TABLE_IDENTIFIER);
         return properties;
     }
 
@@ -272,6 +282,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         //Escape column names?
         final boolean escapeColumnNames = 
context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
 
+        // Quote table name?
+        final boolean quoteTableName = 
context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
+
         // get the database schema from the cache, if one exists. We do this 
in a synchronized block, rather than
         // using a ConcurrentMap because the Map that we are using is a 
LinkedHashMap with a capacity such that if
         // the Map grows beyond this capacity, old elements are evicted. We do 
this in order to avoid filling the
@@ -349,10 +362,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
                 if (INSERT_TYPE.equals(statementType)) {
                     sql = generateInsert(jsonNode, attributes, fqTableName, 
schema, translateFieldNames, ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames);
+                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName);
                 } else {
                     sql = generateUpdate(jsonNode, attributes, fqTableName, 
updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames);
+                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement 
due to {}; routing to failure",
@@ -402,7 +415,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateInsert(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName,
                                   final TableSchema schema, final boolean 
translateFieldNames, final boolean ignoreUnmappedFields, final boolean 
failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames) {
+                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName) {
 
         final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
@@ -420,7 +433,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
         final StringBuilder sqlBuilder = new StringBuilder();
         int fieldCount = 0;
-        sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
+        sqlBuilder.append("INSERT INTO ");
+        if (quoteTableName) {
+            sqlBuilder.append(schema.getQuotedIdentifierString())
+                .append(tableName)
+                .append(schema.getQuotedIdentifierString());
+        } else {
+            sqlBuilder.append(tableName);
+        }
+        sqlBuilder.append(" (");
 
         // iterate over all of the elements in the JSON, building the SQL 
statement by adding the column names, as well as
         // adding the column value to a "sql.args.N.value" attribute and the 
type of a "sql.args.N.type" attribute add the
@@ -439,12 +460,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                     sqlBuilder.append(", ");
                 }
 
-                if(!escapeColumnNames){
-                    sqlBuilder.append(desc.getColumnName());
+                if(escapeColumnNames){
+                    sqlBuilder.append(schema.getQuotedIdentifierString())
+                        .append(desc.getColumnName())
+                        .append(schema.getQuotedIdentifierString());
                 } else {
-                    sqlBuilder.append(schema.getQuotedIdentifierString());
                     sqlBuilder.append(desc.getColumnName());
-                    sqlBuilder.append(schema.getQuotedIdentifierString());
                 }
 
                 final int sqlType = desc.getDataType();
@@ -482,7 +503,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateUpdate(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName, final String updateKeys,
                                   final TableSchema schema, final boolean 
translateFieldNames, final boolean ignoreUnmappedFields, final boolean 
failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames) {
+                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName) {
 
         final Set<String> updateKeyNames;
         if (updateKeys == null) {
@@ -500,7 +521,16 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
         final StringBuilder sqlBuilder = new StringBuilder();
         int fieldCount = 0;
-        sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
+        sqlBuilder.append("UPDATE ");
+        if (quoteTableName) {
+            sqlBuilder.append(schema.getQuotedIdentifierString())
+                .append(tableName)
+                .append(schema.getQuotedIdentifierString());
+        } else {
+            sqlBuilder.append(tableName);
+        }
+
+        sqlBuilder.append(" SET ");
 
 
         // Create a Set of all normalized Update Key names, and ensure that 
there is a field in the JSON
@@ -549,12 +579,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 sqlBuilder.append(", ");
             }
 
-            if(!escapeColumnNames){
-                sqlBuilder.append(desc.getColumnName());
-            } else {
+            if(escapeColumnNames){
                 sqlBuilder.append(schema.getQuotedIdentifierString())
                             .append(desc.getColumnName())
                             .append(schema.getQuotedIdentifierString());
+            } else {
+                sqlBuilder.append(desc.getColumnName());
             }
 
             sqlBuilder.append(" = ?");
@@ -598,12 +628,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             }
             fieldCount++;
 
-            if(!escapeColumnNames){
-                sqlBuilder.append(normalizedColName);
-            } else {
+            if(escapeColumnNames){
                 sqlBuilder.append(schema.getQuotedIdentifierString())
                         .append(normalizedColName)
                         .append(schema.getQuotedIdentifierString());
+            } else {
+                sqlBuilder.append(normalizedColName);
             }
             sqlBuilder.append(" = ?");
             final int sqlType = desc.getDataType();

http://git-wip-us.apache.org/repos/asf/nifi/blob/46fa6a16/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index f808072..cf1459c 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -124,6 +124,43 @@ public class TestConvertJSONToSQL {
     }
 
     @Test
+    public void testInsertQuotedTableIdentifier() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true");
+
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(),
 "1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO \"PERSONS\" (ID, NAME, CODE) 
VALUES (?, ?, ?)");
+    }
+
+    @Test
     public void testInsertWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
         final File tempDir = folder.getRoot();
@@ -196,6 +233,43 @@ public class TestConvertJSONToSQL {
 
 
     @Test
+    public void testUpdateQuotedTableIdentifier() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true");
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(),
 "1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.1.value", "Mark");
+        out.assertAttributeEquals("sql.args.2.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeNotExists("sql.args.2.value");
+        out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "1");
+
+        out.assertContentEquals("UPDATE \"PERSONS\" SET NAME = ?, CODE = ? 
WHERE ID = ?");
+    }
+
+
+    @Test
     public void testMultipleInserts() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
         final File tempDir = folder.getRoot();

Reply via email to