Repository: nifi
Updated Branches:
  refs/heads/master 4c0cf7d72 -> dc4004de6


NIFI-977: Allow SQL Data Types with numerals that are negative


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84db3725
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84db3725
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84db3725

Branch: refs/heads/master
Commit: 84db3725386fe44bbef8a18d84852a4f716addb6
Parents: 49ee06b
Author: Mark Payne <[email protected]>
Authored: Wed Oct 14 13:12:10 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Oct 14 13:12:10 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutSQL.java | 156 +++++++++----------
 .../nifi/processors/standard/TestPutSQL.java    |  46 +++++-
 2 files changed, 119 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/84db3725/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index b087737..5c2bbc2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -70,81 +70,81 @@ import org.apache.nifi.stream.io.StreamUtils;
 @SeeAlso(ConvertJSONToSQL.class)
 @Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
 @CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content 
of an incoming FlowFile is expected to be the SQL command "
-        + "to execute. The SQL command may use the ? to escape parameters. In 
this case, the parameters to use must exist as FlowFile attributes "
-        + "with the naming convention sql.args.N.type and sql.args.N.value, 
where N is a positive integer. The sql.args.N.type is expected to be "
-        + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format.")
+    + "to execute. The SQL command may use the ? to escape parameters. In this 
case, the parameters to use must exist as FlowFile attributes "
+    + "with the naming convention sql.args.N.type and sql.args.N.value, where 
N is a positive integer. The sql.args.N.type is expected to be "
+    + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format.")
 @ReadsAttributes({
-    @ReadsAttribute(attribute="fragment.identifier", description="If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine whether or "
-            + "not two FlowFiles belong to the same transaction."),
-    @ReadsAttribute(attribute="fragment.count", description="If the <Support 
Fragment Transactions> property is true, this attribute is used to determine 
how many FlowFiles "
-            + "are needed to complete the transaction."),
-    @ReadsAttribute(attribute="fragment.index", description="If the <Support 
Fragment Transactions> property is true, this attribute is used to determine 
the order that the FlowFiles "
-            + "in a transaction should be evaluated."),
-    @ReadsAttribute(attribute="sql.args.N.type", description="Incoming 
FlowFiles are expected to be parameterized SQL statements. The type of each 
Parameter is specified as an integer "
-            + "that represents the JDBC Type of the parameter."),
-    @ReadsAttribute(attribute="sql.args.N.value", description="Incoming 
FlowFiles are expected to be parameterized SQL statements. The value of the 
Parameters are specified as "
-            + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so 
on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute.")
+    @ReadsAttribute(attribute = "fragment.identifier", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine whether or "
+        + "not two FlowFiles belong to the same transaction."),
+    @ReadsAttribute(attribute = "fragment.count", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine how many FlowFiles "
+        + "are needed to complete the transaction."),
+    @ReadsAttribute(attribute = "fragment.index", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine the order that the FlowFiles "
+        + "in a transaction should be evaluated."),
+    @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming 
FlowFiles are expected to be parameterized SQL statements. The type of each 
Parameter is specified as an integer "
+        + "that represents the JDBC Type of the parameter."),
+    @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming 
FlowFiles are expected to be parameterized SQL statements. The value of the 
Parameters are specified as "
+        + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. 
The type of the sql.args.1.value Parameter is specified by the sql.args.1.type 
attribute.")
 })
 @WritesAttributes({
-    @WritesAttribute(attribute="sql.generated.key", description="If the 
database generated a key for an INSERT statement and the Obtain Generated Keys 
property is set to true, "
-            + "this attribute will be added to indicate the generated key, if 
possible. This feature is not supported by all database vendors.")
+    @WritesAttribute(attribute = "sql.generated.key", description = "If the 
database generated a key for an INSERT statement and the Obtain Generated Keys 
property is set to true, "
+        + "this attribute will be added to indicate the generated key, if 
possible. This feature is not supported by all database vendors.")
 })
 public class PutSQL extends AbstractProcessor {
 
     static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
-            .name("JDBC Connection Pool")
-            .description("Specifies the JDBC Connection Pool to use in order 
to convert the JSON message to a SQL statement. "
-                    + "The Connection Pool is necessary in order to determine 
the appropriate database column types.")
-            .identifiesControllerService(DBCPService.class)
-            .required(true)
-            .build();
+        .name("JDBC Connection Pool")
+        .description("Specifies the JDBC Connection Pool to use in order to 
convert the JSON message to a SQL statement. "
+            + "The Connection Pool is necessary in order to determine the 
appropriate database column types.")
+        .identifiesControllerService(DBCPService.class)
+        .required(true)
+        .build();
     static final PropertyDescriptor SUPPORT_TRANSACTIONS = new 
PropertyDescriptor.Builder()
-            .name("Support Fragmented Transactions")
-            .description("If true, when a FlowFile is consumed by this 
Processor, the Processor will first check the fragment.identifier and 
fragment.count attributes of that FlowFile. "
-                    + "If the fragment.count value is greater than 1, the 
Processor will not process any FlowFile will that fragment.identifier until all 
are available; "
-                    + "at that point, it will process all FlowFiles with that 
fragment.identifier as a single transaction, in the order specified by the 
FlowFiles' fragment.index attributes. "
-                    + "This Provides atomicity of those SQL statements. If 
this value is false, these attributes will be ignored and the updates will 
occur independent of one another.")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
+        .name("Support Fragmented Transactions")
+        .description("If true, when a FlowFile is consumed by this Processor, 
the Processor will first check the fragment.identifier and fragment.count 
attributes of that FlowFile. "
+            + "If the fragment.count value is greater than 1, the Processor 
will not process any FlowFile will that fragment.identifier until all are 
available; "
+            + "at that point, it will process all FlowFiles with that 
fragment.identifier as a single transaction, in the order specified by the 
FlowFiles' fragment.index attributes. "
+            + "This Provides atomicity of those SQL statements. If this value 
is false, these attributes will be ignored and the updates will occur 
independent of one another.")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
     static final PropertyDescriptor TRANSACTION_TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("Transaction Timeout")
-            .description("If the <Support Fragmented Transactions> property is 
set to true, specifies how long to wait for all FlowFiles for a particular 
fragment.identifier attribute "
-                    + "to arrive before just transferring all of the FlowFiles 
with that identifier to the 'failure' relationship")
-            .required(false)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
+        .name("Transaction Timeout")
+        .description("If the <Support Fragmented Transactions> property is set 
to true, specifies how long to wait for all FlowFiles for a particular 
fragment.identifier attribute "
+            + "to arrive before just transferring all of the FlowFiles with 
that identifier to the 'failure' relationship")
+        .required(false)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
     static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("100")
-            .build();
+        .name("Batch Size")
+        .description("The preferred number of FlowFiles to put to the database 
in a single transaction")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("100")
+        .build();
     static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new 
PropertyDescriptor.Builder()
-            .name("Obtain Generated Keys")
-            .description("If true, any key that is automatically generated by 
the database will be added to the FlowFile that generated it using the 
sql.generate.key attribute. "
-                    + "This may result in slightly slower performance and is 
not supported by all databases.")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
+        .name("Obtain Generated Keys")
+        .description("If true, any key that is automatically generated by the 
database will be added to the FlowFile that generated it using the 
sql.generate.key attribute. "
+            + "This may result in slightly slower performance and is not 
supported by all databases.")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("A FlowFile is routed to this relationship after the 
database is successfully updated")
-            .build();
+        .name("success")
+        .description("A FlowFile is routed to this relationship after the 
database is successfully updated")
+        .build();
     static final Relationship REL_RETRY = new Relationship.Builder()
-            .name("retry")
-            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
-            .build();
+        .name("retry")
+        .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
+        .build();
     static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
-                    + "such as an invalid query or an integrity constraint 
violation")
-            .build();
+        .name("failure")
+        .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
+            + "such as an invalid query or an integrity constraint violation")
+        .build();
 
     private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("sql\\.args\\.(\\d+)\\.type");
-    private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
 
     private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
     private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
@@ -187,7 +187,7 @@ public class PutSQL extends AbstractProcessor {
         final long startNanos = System.nanoTime();
         final boolean obtainKeys = 
context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
         final Map<String, StatementFlowFileEnclosure> statementMap = new 
HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
-        final List<FlowFile> sentFlowFiles = new ArrayList<>();  // flowfiles 
that have been sent
+        final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles 
that have been sent
         final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all 
flowfiles that we have processed
         final Set<StatementFlowFileEnclosure> enclosuresToExecute = new 
LinkedHashSet<>(); // the enclosures that we've processed
 
@@ -286,7 +286,7 @@ public class PutSQL extends AbstractProcessor {
                             conn.rollback();
                             final FlowFile offendingFlowFile = 
batchFlowFiles.get(offendingFlowFileIndex);
                             getLogger().error("Failed to update database due 
to a failed batch update. A total of {} FlowFiles are required for this 
transaction, so routing all to failure. "
-                                    + "Offending FlowFile was {}, which caused 
the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, 
e});
+                                + "Offending FlowFile was {}, which caused the 
following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
                             session.transfer(flowFiles, REL_FAILURE);
                             return;
                         }
@@ -300,7 +300,7 @@ public class PutSQL extends AbstractProcessor {
                         int failureCount = 0;
                         int successCount = 0;
                         int retryCount = 0;
-                        for (int i=0; i < updateCounts.length; i++) {
+                        for (int i = 0; i < updateCounts.length; i++) {
                             final int updateCount = updateCounts[i];
                             final FlowFile flowFile = batchFlowFiles.get(i);
                             if (updateCount == Statement.EXECUTE_FAILED) {
@@ -329,7 +329,7 @@ public class PutSQL extends AbstractProcessor {
                         }
 
                         getLogger().error("Failed to update database due to a 
failed batch update. There were a total of {} FlowFiles that failed, {} that 
succeeded, "
-                                + "and {} that were not execute and will be 
routed to retry; ", new Object[] {failureCount, successCount, retryCount});
+                            + "and {} that were not execute and will be routed 
to retry; ", new Object[] {failureCount, successCount, retryCount});
                     } catch (final SQLNonTransientException e) {
                         getLogger().error("Failed to update database for {} 
due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
 
@@ -339,7 +339,7 @@ public class PutSQL extends AbstractProcessor {
                         continue;
                     } catch (final SQLException e) {
                         getLogger().error("Failed to update database for {} 
due to {}; it is possible that retrying the operation will succeed, so routing 
to retry",
-                                new Object[] {enclosure.getFlowFiles(), e});
+                            new Object[] {enclosure.getFlowFiles(), e});
 
                         for (final FlowFile flowFile : 
enclosure.getFlowFiles()) {
                             destinationRelationships.put(flowFile, REL_RETRY);
@@ -484,7 +484,7 @@ public class PutSQL extends AbstractProcessor {
      *
      * @param stmt the statement that generated a key
      * @return the key that was generated from the given statement, or 
<code>null</code> if no key
-     * was generated or it could not be determined.
+     *         was generated or it could not be determined.
      */
     private String determineGeneratedKey(final PreparedStatement stmt) {
         try {
@@ -514,7 +514,7 @@ public class PutSQL extends AbstractProcessor {
      * @throws SQLException if unable to create the appropriate 
PreparedStatement
      */
     private StatementFlowFileEnclosure getEnclosure(final String sql, final 
Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
-            final boolean obtainKeys, final boolean fragmentedTransaction) 
throws SQLException {
+        final boolean obtainKeys, final boolean fragmentedTransaction) throws 
SQLException {
         StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
         if (enclosure != null) {
             return enclosure;
@@ -620,9 +620,9 @@ public class PutSQL extends AbstractProcessor {
      *
      * @param flowFiles the FlowFiles whose relationship is to be determined
      * @param transactionTimeoutMillis the maximum amount of time (in 
milliseconds) that we should wait
-     *              for all FlowFiles in a transaction to be present before 
routing to failure
+     *            for all FlowFiles in a transaction to be present before 
routing to failure
      * @return the appropriate relationship to route the FlowFiles to, or 
<code>null</code> if the FlowFiles
-     *             should instead be processed
+     *         should instead be processed
      */
     Relationship determineRelationship(final List<FlowFile> flowFiles, final 
Long transactionTimeoutMillis) {
         int selectedNumFragments = 0;
@@ -634,7 +634,7 @@ public class PutSQL extends AbstractProcessor {
                 return null;
             } else if (fragmentCount == null) {
                 getLogger().error("Cannot process {} because there are {} 
FlowFiles with the same fragment.identifier "
-                        + "attribute but not all FlowFiles have a 
fragment.count attribute; routing all to failure",  new Object[] {flowFile, 
flowFiles.size()});
+                    + "attribute but not all FlowFiles have a fragment.count 
attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
                 return REL_FAILURE;
             }
 
@@ -643,13 +643,13 @@ public class PutSQL extends AbstractProcessor {
                 numFragments = Integer.parseInt(fragmentCount);
             } catch (final NumberFormatException nfe) {
                 getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not an integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
+                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentCount});
                 return REL_FAILURE;
             }
 
             if (numFragments < 1) {
                 getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not a positive integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
+                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentCount});
                 return REL_FAILURE;
             }
 
@@ -657,14 +657,14 @@ public class PutSQL extends AbstractProcessor {
                 selectedNumFragments = numFragments;
             } else if (numFragments != selectedNumFragments) {
                 getLogger().error("Cannot process {} because the 
fragment.count attribute has different values for different FlowFiles with the 
same fragment.identifier; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
+                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile});
                 return REL_FAILURE;
             }
 
             final String fragmentIndex = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
             if (fragmentIndex == null) {
                 getLogger().error("Cannot process {} because the 
fragment.index attribute is missing; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
+                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile});
                 return REL_FAILURE;
             }
 
@@ -673,19 +673,19 @@ public class PutSQL extends AbstractProcessor {
                 idx = Integer.parseInt(fragmentIndex);
             } catch (final NumberFormatException nfe) {
                 getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not an integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
+                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentIndex});
                 return REL_FAILURE;
             }
 
             if (idx < 0) {
                 getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not a positive integer; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
+                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentIndex});
                 return REL_FAILURE;
             }
 
             if (bitSet.get(idx)) {
                 getLogger().error("Cannot process {} because it has the same 
value for the fragment.index attribute as another FlowFile with the same 
fragment.identifier; "
-                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
+                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile});
                 return REL_FAILURE;
             }
 
@@ -693,7 +693,7 @@ public class PutSQL extends AbstractProcessor {
         }
 
         if (selectedNumFragments == flowFiles.size()) {
-            return null;   // no relationship to route FlowFiles to yet - 
process the FlowFiles.
+            return null; // no relationship to route FlowFiles to yet - 
process the FlowFiles.
         }
 
         long latestQueueTime = 0L;
@@ -711,7 +711,7 @@ public class PutSQL extends AbstractProcessor {
         }
 
         getLogger().debug("Not enough FlowFiles for transaction. Returning all 
FlowFiles to queue");
-        return Relationship.SELF;  // not enough FlowFiles for this 
transaction. Return them all to queue.
+        return Relationship.SELF; // not enough FlowFiles for this 
transaction. Return them all to queue.
     }
 
     /**
@@ -769,7 +769,7 @@ public class PutSQL extends AbstractProcessor {
                     break;
                 default:
                     throw new SQLException("The '" + attrName + "' attribute 
has a value of '" + parameterValue
-                            + "' and a type of '" + jdbcType + "' but this is 
not a known data type");
+                        + "' and a type of '" + jdbcType + "' but this is not 
a known data type");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/84db3725/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index a348c9e..17506f7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -103,6 +103,7 @@ public class TestPutSQL {
         }
     }
 
+
     @Test
     public void testInsertWithGeneratedKeys() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
@@ -158,7 +159,7 @@ public class TestPutSQL {
         runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
         runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
         runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 
84)".getBytes());
-        runner.enqueue("INSERT INTO PERSONS".getBytes());  // intentionally 
wrong syntax
+        runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally 
wrong syntax
         runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 
3)".getBytes());
         runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 
44)".getBytes());
         runner.run();
@@ -256,6 +257,41 @@ public class TestPutSQL {
     }
 
 
+    @Test
+    public void testUsingSqlDataTypesWithNegativeValues() throws 
InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code bigint)");
+            }
+        }
+
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", "-5");
+        attributes.put("sql.args.1.value", "84");
+        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 
'Mark', ?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
PERSONS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals("Mark", rs.getString(2));
+                assertEquals(84, rs.getInt(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
 
     @Test
     public void testStatementsWithPreparedParameters() throws 
InitializationException, ProcessException, SQLException, IOException {
@@ -343,7 +379,7 @@ public class TestPutSQL {
         runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
 
         final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
-                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+            "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
         attributes.put("sql.args.1.value", "1");
@@ -432,7 +468,7 @@ public class TestPutSQL {
         runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
 
         final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
-                "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
+            "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
         attributes.put("sql.args.1.value", "1");
@@ -471,7 +507,7 @@ public class TestPutSQL {
         runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
 
         final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
-                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+            "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
         attributes.put("sql.args.1.value", "1");
@@ -579,7 +615,7 @@ public class TestPutSQL {
         final MockFlowFile mff = new MockFlowFile(0L) {
             @Override
             public Long getLastQueueDate() {
-                return System.currentTimeMillis() - 10000L;   // return 10 
seconds ago
+                return System.currentTimeMillis() - 10000L; // return 10 
seconds ago
             }
 
             @Override

Reply via email to