Repository: nifi Updated Branches: refs/heads/master e4cee7ee0 -> 6466931c2
NIFI-3430 - added support for specifying sql.arg.N.format for timestamp fields according to java.time.format.DateTimeFormatter This closes #1468. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6466931c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6466931c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6466931c Branch: refs/heads/master Commit: 6466931c236aa29bfd95da63e0e6588fca420f97 Parents: e4cee7e Author: Nick Carenza <[email protected]> Authored: Wed Feb 1 17:41:45 2017 -0800 Committer: Pierre Villard <[email protected]> Committed: Mon Feb 6 18:32:57 2017 +0100 ---------------------------------------------------------------------- .../apache/nifi/processors/standard/PutSQL.java | 44 +++++++++++++++--- .../nifi/processors/standard/TestPutSQL.java | 48 ++++++++++++++++++-- 2 files changed, 83 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6466931c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 76901fe..3efe20e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -60,6 +60,9 @@ import java.sql.Timestamp; import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; @@ -96,10 +99,11 @@ import java.util.regex.Pattern; + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " - + "a format option needs to be specified, currently this is only applicable for binary data types. For binary data types " + + "a format option needs to be specified, currently this is only applicable for binary data types and timestamps. For binary data types " + "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format " + "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all " - + "letters in upper case and no '0x' at the beginning.") + + "letters in upper case and no '0x' at the beginning. For timestamps, the format can be specified according to java.time.format.DateTimeFormatter." + + "Customer and named patterns are accepted i.e. ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME')") }) @WritesAttributes({ @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " @@ -792,11 +796,19 @@ public class PutSQL extends AbstractProcessor { case Types.TIMESTAMP: long lTimestamp=0L; - if(LONG_PATTERN.matcher(parameterValue).matches()){ - lTimestamp = Long.parseLong(parameterValue); + // Backwards compatibility note: Format was unsupported for a timestamp field. + if (valueFormat.equals("")) { + if(LONG_PATTERN.matcher(parameterValue).matches()){ + lTimestamp = Long.parseLong(parameterValue); + } else { + final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + lTimestamp = parsedDate.getTime(); + } }else { - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - java.util.Date parsedDate = dateFormat.parse(parameterValue); + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + TemporalAccessor accessor = dtFormatter.parse(parameterValue); + java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor)); lTimestamp = parsedDate.getTime(); } @@ -839,6 +851,26 @@ public class PutSQL extends AbstractProcessor { } } + private DateTimeFormatter getDateTimeFormatter(String pattern) { + switch(pattern) { + case "BASIC_ISO_DATE": return DateTimeFormatter.BASIC_ISO_DATE; + case "ISO_LOCAL_DATE": return DateTimeFormatter.ISO_LOCAL_DATE; + case "ISO_OFFSET_DATE": return DateTimeFormatter.ISO_OFFSET_DATE; + case "ISO_DATE": return DateTimeFormatter.ISO_DATE; + case "ISO_LOCAL_TIME": return DateTimeFormatter.ISO_LOCAL_TIME; + case "ISO_OFFSET_TIME": return DateTimeFormatter.ISO_OFFSET_TIME; + case "ISO_TIME": return DateTimeFormatter.ISO_TIME; + case "ISO_LOCAL_DATE_TIME": return DateTimeFormatter.ISO_LOCAL_DATE_TIME; + case "ISO_OFFSET_DATE_TIME": return DateTimeFormatter.ISO_OFFSET_DATE_TIME; + case "ISO_ZONED_DATE_TIME": return DateTimeFormatter.ISO_ZONED_DATE_TIME; + case "ISO_DATE_TIME": return DateTimeFormatter.ISO_DATE_TIME; + case "ISO_ORDINAL_DATE": return DateTimeFormatter.ISO_ORDINAL_DATE; + case "ISO_WEEK_DATE": return DateTimeFormatter.ISO_WEEK_DATE; + case "ISO_INSTANT": return DateTimeFormatter.ISO_INSTANT; + case "RFC_1123_DATE_TIME": return DateTimeFormatter.RFC_1123_DATE_TIME; + default: return DateTimeFormatter.ofPattern(pattern); + } + } /** * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong http://git-wip-us.apache.org/repos/asf/nifi/blob/6466931c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 321bac7..25ec476 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -268,12 +268,13 @@ public class TestPutSQL { } } + // Not specifying a format for the date fields here to continue to test backwards compatibility @Test public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE TIMESTAMPTESTS (id integer primary key, ts1 timestamp, ts2 timestamp)"); + stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST1 (id integer primary key, ts1 timestamp, ts2 timestamp)"); } } @@ -292,14 +293,14 @@ public class TestPutSQL { attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP)); attributes.put("sql.args.2.value", art3TS); - runner.enqueue("INSERT INTO TIMESTAMPTESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.enqueue("INSERT INTO TIMESTAMPTEST1 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); runner.run(); runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { - final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTESTS"); + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST1"); assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); assertEquals(arg2TS, rs.getString(2)); @@ -310,6 +311,47 @@ public class TestPutSQL { } @Test + public void testUsingTimestampValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST2 (id integer primary key, ts1 timestamp, ts2 timestamp)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String dateStr = "2002-02-02T12:02:02+00:00"; + final long dateInt = 1012651322000L; + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP)); + attributes.put("sql.args.1.value", dateStr); + attributes.put("sql.args.1.format", "ISO_OFFSET_DATE_TIME"); + attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP)); + attributes.put("sql.args.2.value", dateStr); + attributes.put("sql.args.2.format", "yyyy-MM-dd'T'HH:mm:ssXXX"); + + runner.enqueue("INSERT INTO TIMESTAMPTEST2 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST2"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(dateInt, rs.getTimestamp(2).getTime()); + assertEquals(dateInt, rs.getTimestamp(3).getTime()); + assertFalse(rs.next()); + } + } + } + + @Test public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); try (final Connection conn = service.getConnection()) {
