Repository: nifi
Updated Branches:
  refs/heads/master e4cee7ee0 -> 6466931c2


NIFI-3430 - added support for specifying sql.arg.N.format for timestamp fields 
according to java.time.format.DateTimeFormatter

This closes #1468.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6466931c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6466931c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6466931c

Branch: refs/heads/master
Commit: 6466931c236aa29bfd95da63e0e6588fca420f97
Parents: e4cee7e
Author: Nick Carenza <[email protected]>
Authored: Wed Feb 1 17:41:45 2017 -0800
Committer: Pierre Villard <[email protected]>
Committed: Mon Feb 6 18:32:57 2017 +0100

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutSQL.java | 44 +++++++++++++++---
 .../nifi/processors/standard/TestPutSQL.java    | 48 ++++++++++++++++++--
 2 files changed, 83 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6466931c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 76901fe..3efe20e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -60,6 +60,9 @@ import java.sql.Timestamp;
 import java.sql.Types;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
@@ -96,10 +99,11 @@ import java.util.regex.Pattern;
                 + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and 
so on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute."),
         @ReadsAttribute(attribute = "sql.args.N.format", description = "This 
attribute is always optional, but default options may not always work for your 
data. "
                 + "Incoming FlowFiles are expected to be parametrized SQL 
statements. In some cases "
-                + "a format option needs to be specified, currently this is 
only applicable for binary data types. For binary data types "
+                + "a format option needs to be specified, currently this is 
only applicable for binary data types and timestamps. For binary data types "
                 + "available options are 'ascii', 'base64' and 'hex'.  In 
'ascii' format each string character in your attribute value represents a 
single byte, this is the default format "
                 + "and the format provided by Avro Processors. In 'base64' 
format your string is a Base64 encoded string.  In 'hex' format the string is 
hex encoded with all "
-                + "letters in upper case and no '0x' at the beginning.")
+                + "letters in upper case and no '0x' at the beginning. For 
timestamps, the format can be specified according to 
java.time.format.DateTimeFormatter."
+                + "Customer and named patterns are accepted i.e. 
('yyyy-MM-dd','ISO_OFFSET_DATE_TIME')")
 })
 @WritesAttributes({
         @WritesAttribute(attribute = "sql.generated.key", description = "If 
the database generated a key for an INSERT statement and the Obtain Generated 
Keys property is set to true, "
@@ -792,11 +796,19 @@ public class PutSQL extends AbstractProcessor {
                 case Types.TIMESTAMP:
                     long lTimestamp=0L;
 
-                    if(LONG_PATTERN.matcher(parameterValue).matches()){
-                        lTimestamp = Long.parseLong(parameterValue);
+                    // Backwards compatibility note: Format was unsupported 
for a timestamp field.
+                    if (valueFormat.equals("")) {
+                        if(LONG_PATTERN.matcher(parameterValue).matches()){
+                            lTimestamp = Long.parseLong(parameterValue);
+                        } else {
+                            final SimpleDateFormat dateFormat  = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                            lTimestamp = parsedDate.getTime();
+                        }
                     }else {
-                        SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-                        java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
+                        TemporalAccessor accessor = 
dtFormatter.parse(parameterValue);
+                        java.util.Date parsedDate = 
java.util.Date.from(Instant.from(accessor));
                         lTimestamp = parsedDate.getTime();
                     }
 
@@ -839,6 +851,26 @@ public class PutSQL extends AbstractProcessor {
         }
     }
 
+    private DateTimeFormatter getDateTimeFormatter(String pattern) {
+        switch(pattern) {
+            case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE;
+            case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE;
+            case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE;
+            case "ISO_DATE": return DateTimeFormatter.ISO_DATE;
+            case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME;
+            case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME;
+            case "ISO_TIME": return DateTimeFormatter.ISO_TIME;
+            case "ISO_LOCAL_DATE_TIME": return 
DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+            case "ISO_OFFSET_DATE_TIME": return 
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
+            case "ISO_ZONED_DATE_TIME": return 
DateTimeFormatter.ISO_ZONED_DATE_TIME;
+            case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME;
+            case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE;
+            case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE;
+            case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT;
+            case "RFC_1123_DATE_TIME": return 
DateTimeFormatter.RFC_1123_DATE_TIME;
+            default: return DateTimeFormatter.ofPattern(pattern);
+        }
+    }
 
     /**
      * A FlowFileFilter that is responsible for ensuring that the FlowFiles 
returned either belong

http://git-wip-us.apache.org/repos/asf/nifi/blob/6466931c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index 321bac7..25ec476 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -268,12 +268,13 @@ public class TestPutSQL {
         }
     }
 
+    // Not specifying a format for the date fields here to continue to test 
backwards compatibility
     @Test
     public void testUsingTimestampValuesEpochAndString() throws 
InitializationException, ProcessException, SQLException, IOException, 
ParseException {
         final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
         try (final Connection conn = service.getConnection()) {
             try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate("CREATE TABLE TIMESTAMPTESTS (id integer 
primary key, ts1 timestamp, ts2 timestamp)");
+                stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST1 (id integer 
primary key, ts1 timestamp, ts2 timestamp)");
             }
         }
 
@@ -292,14 +293,14 @@ public class TestPutSQL {
         attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
         attributes.put("sql.args.2.value", art3TS);
 
-        runner.enqueue("INSERT INTO TIMESTAMPTESTS (ID, ts1, ts2) VALUES (1, 
?, ?)".getBytes(), attributes);
+        runner.enqueue("INSERT INTO TIMESTAMPTEST1 (ID, ts1, ts2) VALUES (1, 
?, ?)".getBytes(), attributes);
         runner.run();
 
         runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
 
         try (final Connection conn = service.getConnection()) {
             try (final Statement stmt = conn.createStatement()) {
-                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
TIMESTAMPTESTS");
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
TIMESTAMPTEST1");
                 assertTrue(rs.next());
                 assertEquals(1, rs.getInt(1));
                 assertEquals(arg2TS, rs.getString(2));
@@ -310,6 +311,47 @@ public class TestPutSQL {
     }
 
     @Test
+    public void testUsingTimestampValuesWithFormatAttribute() throws 
InitializationException, ProcessException, SQLException, IOException, 
ParseException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST2 (id integer 
primary key, ts1 timestamp, ts2 timestamp)");
+            }
+        }
+
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String dateStr = "2002-02-02T12:02:02+00:00";
+        final long dateInt = 1012651322000L;
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP));
+        attributes.put("sql.args.1.value", dateStr);
+        attributes.put("sql.args.1.format", "ISO_OFFSET_DATE_TIME");
+        attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
+        attributes.put("sql.args.2.value", dateStr);
+        attributes.put("sql.args.2.format", "yyyy-MM-dd'T'HH:mm:ssXXX");
+
+        runner.enqueue("INSERT INTO TIMESTAMPTEST2 (ID, ts1, ts2) VALUES (1, 
?, ?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
TIMESTAMPTEST2");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals(dateInt, rs.getTimestamp(2).getTime());
+                assertEquals(dateInt, rs.getTimestamp(3).getTime());
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
     public void testBinaryColumnTypes() throws InitializationException, 
ProcessException, SQLException, IOException, ParseException {
         final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
         try (final Connection conn = service.getConnection()) {

Reply via email to