Repository: nifi
Updated Branches:
  refs/heads/master 17718940d -> 612675e42


NIFI-4684 - Added SQL Parameter Attribute Prefix property to ConvertJSONToSQL

Signed-off-by: Pierre Villard <[email protected]>

This closes #2333.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/612675e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/612675e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/612675e4

Branch: refs/heads/master
Commit: 612675e4283a22481ee76a0ad81a092f500150a9
Parents: 1771894
Author: Matthew Burgess <[email protected]>
Authored: Fri Dec 8 22:01:25 2017 -0500
Committer: Pierre Villard <[email protected]>
Committed: Tue Dec 19 10:32:52 2017 +0100

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   | 71 ++++++++++++--------
 .../standard/TestConvertJSONToSQL.java          | 29 ++++++++
 2 files changed, 73 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/612675e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 00db3b5..638ec9d 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -83,9 +83,10 @@ import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
         + "relationship and the SQL is routed to the 'sql' relationship.")
 @WritesAttributes({
         @WritesAttribute(attribute="mime.type", description="Sets mime.type of 
FlowFile that is routed to 'sql' to 'text/plain'."),
-        @WritesAttribute(attribute="sql.table", description="Sets the 
sql.table attribute of FlowFile that is routed to 'sql' to the name of the 
table that is updated by the SQL statement."),
-        @WritesAttribute(attribute="sql.catalog", description="If the Catalog 
name is set for this database, specifies the name of the catalog that the SQL 
statement will update. "
-                + "If no catalog is used, this attribute will not be added."),
+        @WritesAttribute(attribute = "<sql>.table", description = "Sets the 
<sql>.table attribute of FlowFile that is routed to 'sql' to the name of the 
table that is updated by the SQL statement. "
+                + "The prefix for this attribute ('sql', e.g.) is determined 
by the SQL Parameter Attribute Prefix property."),
+        @WritesAttribute(attribute="<sql>.catalog", description="If the 
Catalog name is set for this database, specifies the name of the catalog that 
the SQL statement will update. "
+                + "If no catalog is used, this attribute will not be added. 
The prefix for this attribute ('sql', e.g.) is determined by the SQL Parameter 
Attribute Prefix property."),
         @WritesAttribute(attribute="fragment.identifier", description="All 
FlowFiles routed to the 'sql' relationship for the same incoming FlowFile 
(multiple will be output for the same incoming "
                 + "FlowFile if the incoming FlowFile is a JSON Array) will 
have the same value for the fragment.identifier attribute. This can then be 
used to correlate the results."),
         @WritesAttribute(attribute="fragment.count", description="The number 
of SQL FlowFiles that were produced for same incoming FlowFile. This can be 
used in conjunction with the "
@@ -93,12 +94,14 @@ import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
         @WritesAttribute(attribute="fragment.index", description="The position 
of this FlowFile in the list of outgoing FlowFiles that were all derived from 
the same incoming FlowFile. This can be "
                 + "used in conjunction with the fragment.identifier and 
fragment.count attributes to know which FlowFiles originated from the same 
incoming FlowFile and in what order the SQL "
                 + "FlowFiles were produced"),
-        @WritesAttribute(attribute="sql.args.N.type", description="The output 
SQL statements are parametrized in order to avoid SQL Injection Attacks. The 
types of the Parameters "
-                + "to use are stored in attributes named sql.args.1.type, 
sql.args.2.type, sql.args.3.type, and so on. The type is a number representing 
a JDBC Type constant. "
-                + "Generally, this is useful only for software to read and 
interpret but is added so that a processor such as PutSQL can understand how to 
interpret the values."),
-        @WritesAttribute(attribute="sql.args.N.value", description="The output 
SQL statements are parametrized in order to avoid SQL Injection Attacks. The 
values of the Parameters "
+        @WritesAttribute(attribute="<sql>.args.N.type", description="The 
output SQL statements are parametrized in order to avoid SQL Injection Attacks. 
The types of the Parameters "
+                + "to use are stored in attributes named <sql>.args.1.type, 
<sql>.args.2.type, <sql>.args.3.type, and so on. The type is a number 
representing a JDBC Type constant. "
+                + "Generally, this is useful only for software to read and 
interpret but is added so that a processor such as PutSQL can understand how to 
interpret the values. "
+                + "The prefix for this attribute ('sql', e.g.) is determined 
by the SQL Parameter Attribute Prefix property."),
+        @WritesAttribute(attribute="<sql>.args.N.value", description="The 
output SQL statements are parametrized in order to avoid SQL Injection Attacks. 
The values of the Parameters "
                 + "to use are stored in the attributes named sql.args.1.value, 
sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a 
corresponding "
-                + "sql.args.N.type attribute that indicates how the value 
should be interpreted when inserting it into the database.")
+                + "<sql>.args.N.type attribute that indicates how the value 
should be interpreted when inserting it into the database."
+                + "The prefix for this attribute ('sql', e.g.) is determined 
by the SQL Parameter Attribute Prefix property.")
 })
 public class ConvertJSONToSQL extends AbstractProcessor {
     private static final String UPDATE_TYPE = "UPDATE";
@@ -201,6 +204,16 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor SQL_PARAM_ATTR_PREFIX = new 
PropertyDescriptor.Builder()
+            .name("jts-sql-param-attr-prefix")
+            .displayName("SQL Parameter Attribute Prefix")
+            .description("The string to be prepended to the outgoing flow file 
attributes, such as <sql>.args.1.value, where <sql> is replaced with the 
specified value")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .required(true)
+            .defaultValue("sql")
+            .build();
+
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
             .description("When a FlowFile is converted to SQL, the original 
JSON FlowFile is routed to this relationship")
@@ -238,6 +251,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(UPDATE_KEY);
         properties.add(QUOTED_IDENTIFIERS);
         properties.add(QUOTED_TABLE_IDENTIFIER);
+        properties.add(SQL_PARAM_ATTR_PREFIX);
         return properties;
     }
 
@@ -287,6 +301,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         // Quote table name?
         final boolean quoteTableName = 
context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
 
+        // Attribute prefix
+        final String attributePrefix = 
context.getProperty(SQL_PARAM_ATTR_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
+
         // get the database schema from the cache, if one exists. We do this 
in a synchronized block, rather than
         // using a ConcurrentMap because the Map that we are using is a 
LinkedHashMap with a capacity such that if
         // the Map grows beyond this capacity, old elements are evicted. We do 
this in order to avoid filling the
@@ -364,13 +381,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
                 if (INSERT_TYPE.equals(statementType)) {
                     sql = generateInsert(jsonNode, attributes, fqTableName, 
schema, translateFieldNames, ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName);
+                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName, attributePrefix);
                 } else if (UPDATE_TYPE.equals(statementType)) {
                     sql = generateUpdate(jsonNode, attributes, fqTableName, 
updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName);
+                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName, attributePrefix);
                 } else {
                     sql = generateDelete(jsonNode, attributes, fqTableName, 
schema, translateFieldNames, ignoreUnmappedFields,
-                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName);
+                            failUnmappedColumns, warningUnmappedColumns, 
escapeColumnNames, quoteTableName, attributePrefix);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement 
due to {}; routing to failure",
@@ -391,13 +408,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             });
 
             attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
-            attributes.put("sql.table", tableName);
+            attributes.put(attributePrefix + ".table", tableName);
             attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
             attributes.put(FRAGMENT_COUNT.key(), 
String.valueOf(arrayNode.size()));
             attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
 
             if (catalog != null) {
-                attributes.put("sql.catalog", catalog);
+                attributes.put(attributePrefix + ".catalog", catalog);
             }
 
             sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
@@ -420,7 +437,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateInsert(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName,
                                   final TableSchema schema, final boolean 
translateFieldNames, final boolean ignoreUnmappedFields, final boolean 
failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName) {
+                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName, final String 
attributePrefix) {
 
         final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
@@ -449,7 +466,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         sqlBuilder.append(" (");
 
         // iterate over all of the elements in the JSON, building the SQL 
statement by adding the column names, as well as
-        // adding the column value to a "sql.args.N.value" attribute and the 
type of a "sql.args.N.type" attribute add the
+        // adding the column value to a "<sql>.args.N.value" attribute and the 
type of a "<sql>.args.N.type" attribute add the
         // columns that we are inserting into
         final Iterator<String> fieldNames = rootNode.getFieldNames();
         while (fieldNames.hasNext()) {
@@ -474,13 +491,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 }
 
                 final int sqlType = desc.getDataType();
-                attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+                attributes.put(attributePrefix + ".args." + fieldCount + 
".type", String.valueOf(sqlType));
 
                 final Integer colSize = desc.getColumnSize();
                 final JsonNode fieldNode = rootNode.get(fieldName);
                 if (!fieldNode.isNull()) {
                     String fieldValue = createSqlStringValue(fieldNode, 
colSize, sqlType);
-                    attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
+                    attributes.put(attributePrefix + ".args." + fieldCount + 
".value", fieldValue);
                 }
             }
         }
@@ -562,7 +579,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateUpdate(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName, final String updateKeys,
                                   final TableSchema schema, final boolean 
translateFieldNames, final boolean ignoreUnmappedFields, final boolean 
failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName) {
+                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName, final String 
attributePrefix) {
 
         final Set<String> updateKeyNames;
         if (updateKeys == null) {
@@ -612,7 +629,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         }
 
         // iterate over all of the elements in the JSON, building the SQL 
statement by adding the column names, as well as
-        // adding the column value to a "sql.args.N.value" attribute and the 
type of a "sql.args.N.type" attribute add the
+        // adding the column value to a "<sql>.args.N.value" attribute and the 
type of a "<sql>.args.N.type" attribute add the
         // columns that we are inserting into
         Iterator<String> fieldNames = rootNode.getFieldNames();
         while (fieldNames.hasNext()) {
@@ -648,14 +665,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
             sqlBuilder.append(" = ?");
             final int sqlType = desc.getDataType();
-            attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+            attributes.put(attributePrefix + ".args." + fieldCount + ".type", 
String.valueOf(sqlType));
 
             final Integer colSize = desc.getColumnSize();
 
             final JsonNode fieldNode = rootNode.get(fieldName);
             if (!fieldNode.isNull()) {
                 String fieldValue = createSqlStringValue(fieldNode, colSize, 
sqlType);
-                attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
+                attributes.put(attributePrefix + ".args." + fieldCount + 
".value", fieldValue);
             }
         }
 
@@ -693,14 +710,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             }
             sqlBuilder.append(" = ?");
             final int sqlType = desc.getDataType();
-            attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+            attributes.put(attributePrefix + ".args." + fieldCount + ".type", 
String.valueOf(sqlType));
 
             final Integer colSize = desc.getColumnSize();
             String fieldValue = rootNode.get(fieldName).asText();
             if (colSize != null && fieldValue.length() > colSize) {
                 fieldValue = fieldValue.substring(0, colSize);
             }
-            attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+            attributes.put(attributePrefix + ".args." + fieldCount + ".value", 
fieldValue);
         }
 
         return sqlBuilder.toString();
@@ -708,7 +725,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     private String generateDelete(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName,
                                   final TableSchema schema, final boolean 
translateFieldNames, final boolean ignoreUnmappedFields, final boolean 
failUnmappedColumns,
-                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName) {
+                                  final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName, final String 
attributePrefix) {
         final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(rootNode, translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
             final String normalizedColName = 
normalizeColumnName(requiredColName, translateFieldNames);
@@ -737,7 +754,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         sqlBuilder.append(" WHERE ");
 
         // iterate over all of the elements in the JSON, building the SQL 
statement by adding the column names, as well as
-        // adding the column value to a "sql.args.N.value" attribute and the 
type of a "sql.args.N.type" attribute add the
+        // adding the column value to a "<sql>.args.N.value" attribute and the 
type of a "<sql>.args.N.type" attribute add the
         // columns that we are inserting into
         final Iterator<String> fieldNames = rootNode.getFieldNames();
         while (fieldNames.hasNext()) {
@@ -763,7 +780,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 sqlBuilder.append(" = ?");
 
                 final int sqlType = desc.getDataType();
-                attributes.put("sql.args." + fieldCount + ".type", 
String.valueOf(sqlType));
+                attributes.put(attributePrefix + ".args." + fieldCount + 
".type", String.valueOf(sqlType));
 
                 final Integer colSize = desc.getColumnSize();
                 final JsonNode fieldNode = rootNode.get(fieldName);
@@ -772,7 +789,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                     if (colSize != null && fieldValue.length() > colSize) {
                         fieldValue = fieldValue.substring(0, colSize);
                     }
-                    attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
+                    attributes.put(attributePrefix + ".args." + fieldCount + 
".value", fieldValue);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/612675e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 2332e28..8a03a81 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -874,6 +874,35 @@ public class TestConvertJSONToSQL {
         out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? 
AND CODE = ?");
     }
 
+    @Test
+    public void testAttributePrefix() throws InitializationException, 
ProcessException, SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
+        runner.setProperty(ConvertJSONToSQL.SQL_PARAM_ATTR_PREFIX, "hiveql");
+
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(),
 "1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("hiveql.args.1.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("hiveql.args.1.value", "1");
+        out.assertAttributeEquals("hiveql.args.2.type", 
String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("hiveql.args.2.value", "Mark");
+        out.assertAttributeEquals("hiveql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("hiveql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", 
\"CODE\") VALUES (?, ?, ?)");
+    }
+
     /**
      * Simple implementation only for testing purposes
      */

Reply via email to