Repository: nifi
Updated Branches:
  refs/heads/master 3da8b94dd -> a6e94de0b


NIFI-2829 Date and Time Format Support for PutSQL

This closes #1524.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/03bff7c9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/03bff7c9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/03bff7c9

Branch: refs/heads/master
Commit: 03bff7c9fc320a95dbacef1eb8390a1aae174dc4
Parents: 3da8b94
Author: patricker <[email protected]>
Authored: Tue Feb 21 13:22:04 2017 -0700
Committer: Koji Kawamura <[email protected]>
Committed: Tue Jul 18 14:11:37 2017 +0900

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutSQL.java |  59 ++++++-
 .../nifi/processors/standard/TestPutSQL.java    | 156 +++++++++++++++++++
 2 files changed, 208 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/03bff7c9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 7e75964..b48bd0f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -72,6 +72,9 @@ import java.sql.Types;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.TemporalAccessor;
 import java.util.ArrayList;
@@ -110,11 +113,13 @@ import static 
org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnEr
                 + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and 
so on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute."),
         @ReadsAttribute(attribute = "sql.args.N.format", description = "This 
attribute is always optional, but default options may not always work for your 
data. "
                 + "Incoming FlowFiles are expected to be parametrized SQL 
statements. In some cases "
-                + "a format option needs to be specified, currently this is 
only applicable for binary data types and timestamps. For binary data types "
-                + "available options are 'ascii', 'base64' and 'hex'.  In 
'ascii' format each string character in your attribute value represents a 
single byte, this is the default format "
-                + "and the format provided by Avro Processors. In 'base64' 
format your string is a Base64 encoded string.  In 'hex' format the string is 
hex encoded with all "
-                + "letters in upper case and no '0x' at the beginning. For 
timestamps, the format can be specified according to 
java.time.format.DateTimeFormatter."
-                + "Customer and named patterns are accepted i.e. 
('yyyy-MM-dd','ISO_OFFSET_DATE_TIME')")
+                + "a format option needs to be specified, currently this is 
only applicable for binary data types, dates, times and timestamps. Binary Data 
Types (defaults to 'ascii') - "
+                + "ascii: each string character in your attribute value 
represents a single byte. This is the format provided by Avro Processors. "
+                + "base64: the string is a Base64 encoded string that can be 
decoded to bytes. "
+                + "hex: the string is hex encoded with all letters in upper 
case and no '0x' at the beginning. "
+                + "Dates/Times/Timestamps - "
+                + "Date, Time and Timestamp formats all support both custom 
formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+                + "as specified according to 
java.time.format.DateTimeFormatter.")
 })
 @WritesAttributes({
         @WritesAttribute(attribute = "sql.generated.key", description = "If 
the database generated a key for an INSERT statement and the Obtain Generated 
Keys property is set to true, "
@@ -828,10 +833,50 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
                     stmt.setBigDecimal(parameterIndex, new 
BigDecimal(parameterValue));
                     break;
                 case Types.DATE:
-                    stmt.setDate(parameterIndex, new 
Date(Long.parseLong(parameterValue)));
+                    Date date;
+
+                    if (valueFormat.equals("")) {
+                        if(LONG_PATTERN.matcher(parameterValue).matches()){
+                            date = new Date(Long.parseLong(parameterValue));
+                        }else {
+                            String dateFormatString = "yyyy-MM-dd";
+                            if (!valueFormat.isEmpty()) {
+                                dateFormatString = valueFormat;
+                            }
+                            SimpleDateFormat dateFormat = new 
SimpleDateFormat(dateFormatString);
+                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                            date = new Date(parsedDate.getTime());
+                        }
+                    } else {
+                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
+                        LocalDate parsedDate = LocalDate.parse(parameterValue, 
dtFormatter);
+                        date = new 
Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
+                    }
+
+                    stmt.setDate(parameterIndex, date);
                     break;
                 case Types.TIME:
-                    stmt.setTime(parameterIndex, new 
Time(Long.parseLong(parameterValue)));
+                    Time time;
+
+                    if (valueFormat.equals("")) {
+                        if (LONG_PATTERN.matcher(parameterValue).matches()) {
+                            time = new Time(Long.parseLong(parameterValue));
+                        } else {
+                            String timeFormatString = "HH:mm:ss.SSS";
+                            if (!valueFormat.isEmpty()) {
+                                timeFormatString = valueFormat;
+                            }
+                            SimpleDateFormat dateFormat = new 
SimpleDateFormat(timeFormatString);
+                            java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                            time = new Time(parsedDate.getTime());
+                        }
+                    } else {
+                        final DateTimeFormatter dtFormatter = 
getDateTimeFormatter(valueFormat);
+                        LocalTime parsedTime = LocalTime.parse(parameterValue, 
dtFormatter);
+                        time = Time.valueOf(parsedTime);
+                    }
+
+                    stmt.setTime(parameterIndex, time);
                     break;
                 case Types.TIMESTAMP:
                     long lTimestamp=0L;

http://git-wip-us.apache.org/repos/asf/nifi/blob/03bff7c9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index 2d0491b..c5c9a4d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -452,6 +452,77 @@ public class TestPutSQL {
     }
 
     @Test
+    public void testUsingDateTimeValuesWithFormatAttribute() throws 
InitializationException, ProcessException, SQLException, IOException, 
ParseException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer 
primary key, ts1 TIME, ts2 DATE)");
+            }
+        }
+
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String dateStr = "2002-02-02";
+        final String timeStr = "12:02:02";
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
+        attributes.put("sql.args.1.value", timeStr);
+        attributes.put("sql.args.1.format", "ISO_LOCAL_TIME");
+        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
+        attributes.put("sql.args.2.value", dateStr);
+        attributes.put("sql.args.2.format", "ISO_LOCAL_DATE");
+
+        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, 
?, ?)".getBytes(), attributes);
+
+        attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
+        attributes.put("sql.args.1.value", "68522000");
+        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
+        attributes.put("sql.args.2.value", "1012633200000");
+
+        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, 
?, ?)".getBytes(), attributes);
+
+        attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
+        attributes.put("sql.args.1.value", "120202000");
+        attributes.put("sql.args.1.format", "HHmmssSSS");
+        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
+        attributes.put("sql.args.2.value", "20020202");
+        attributes.put("sql.args.2.format", "yyyyMMdd");
+
+        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, 
?, ?)".getBytes(), attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
TIMESTAMPTEST3 ORDER BY ID");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals(68522000L, rs.getTime(2).getTime());
+                assertEquals(1012633200000L, rs.getDate(3).getTime());
+
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+                assertEquals(68522000L, rs.getTime(2).getTime());
+                assertEquals(1012633200000L, rs.getDate(3).getTime());
+
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                assertEquals(68522000L, rs.getTime(2).getTime());
+                assertEquals(1012633200000L, rs.getDate(3).getTime());
+
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
     public void testBitType() throws SQLException, InitializationException {
         final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
         try (final Connection conn = service.getConnection()) {
@@ -569,6 +640,91 @@ public class TestPutSQL {
     }
 
     @Test
+    public void testUsingTimeValuesEpochAndString() throws 
InitializationException, ProcessException, SQLException, IOException, 
ParseException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary 
key, ts1 time, ts2 time)");
+            }
+        }
+
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String arg2TS = "00:01:01";
+        final String art3TS = "12:02:02";
+        final String timeFormatString = "HH:mm:ss";
+        SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
+        java.util.Date parsedDate = dateFormat.parse(arg2TS);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
+        attributes.put("sql.args.1.value", 
Long.toString(parsedDate.getTime()));
+
+        attributes.put("sql.args.2.type", String.valueOf(Types.TIME));
+        attributes.put("sql.args.2.value", art3TS);
+        attributes.put("sql.args.2.format", timeFormatString);
+
+        runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, 
?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
TIMETESTS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals(arg2TS, rs.getString(2));
+                assertEquals(art3TS, rs.getString(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testUsingDateValuesEpochAndString() throws 
InitializationException, ProcessException, SQLException, IOException, 
ParseException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary 
key, ts1 date, ts2 date)");
+            }
+        }
+
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String arg2TS = "2001-01-01";
+        final String art3TS = "2002-02-02";
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+        java.util.Date parsedDate = dateFormat.parse(arg2TS);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.DATE));
+        attributes.put("sql.args.1.value", 
Long.toString(parsedDate.getTime()));
+        attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
+        attributes.put("sql.args.2.value", art3TS);
+
+        runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, 
?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
DATETESTS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals(arg2TS, rs.getString(2));
+                assertEquals(art3TS, rs.getString(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
     public void testBinaryColumnTypes() throws InitializationException, 
ProcessException, SQLException, IOException, ParseException {
         final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
         try (final Connection conn = service.getConnection()) {

Reply via email to