Repository: nifi Updated Branches: refs/heads/master 3da8b94dd -> a6e94de0b
NIFI-2829 Date and Time Format Support for PutSQL This closes #1524. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/03bff7c9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/03bff7c9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/03bff7c9 Branch: refs/heads/master Commit: 03bff7c9fc320a95dbacef1eb8390a1aae174dc4 Parents: 3da8b94 Author: patricker <[email protected]> Authored: Tue Feb 21 13:22:04 2017 -0700 Committer: Koji Kawamura <[email protected]> Committed: Tue Jul 18 14:11:37 2017 +0900 ---------------------------------------------------------------------- .../apache/nifi/processors/standard/PutSQL.java | 59 ++++++- .../nifi/processors/standard/TestPutSQL.java | 156 +++++++++++++++++++ 2 files changed, 208 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/03bff7c9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 7e75964..b48bd0f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -72,6 +72,9 @@ import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; import java.util.ArrayList; @@ -110,11 +113,13 @@ import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnEr + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " - + "a format option needs to be specified, currently this is only applicable for binary data types and timestamps. For binary data types " - + "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format " - + "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all " - + "letters in upper case and no '0x' at the beginning. For timestamps, the format can be specified according to java.time.format.DateTimeFormatter." - + "Customer and named patterns are accepted i.e. ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME')") + + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - " + + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. " + + "base64: the string is a Base64 encoded string that can be decoded to bytes. " + + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. " + + "Dates/Times/Timestamps - " + + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') " + + "as specified according to java.time.format.DateTimeFormatter.") }) @WritesAttributes({ @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " @@ -828,10 +833,50 @@ public class PutSQL extends AbstractSessionFactoryProcessor { stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); break; case Types.DATE: - stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); + Date date; + + if (valueFormat.equals("")) { + if(LONG_PATTERN.matcher(parameterValue).matches()){ + date = new Date(Long.parseLong(parameterValue)); + }else { + String dateFormatString = "yyyy-MM-dd"; + if (!valueFormat.isEmpty()) { + dateFormatString = valueFormat; + } + SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + date = new Date(parsedDate.getTime()); + } + } else { + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter); + date = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime()); + } + + stmt.setDate(parameterIndex, date); break; case Types.TIME: - stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); + Time time; + + if (valueFormat.equals("")) { + if (LONG_PATTERN.matcher(parameterValue).matches()) { + time = new Time(Long.parseLong(parameterValue)); + } else { + String timeFormatString = "HH:mm:ss.SSS"; + if (!valueFormat.isEmpty()) { + timeFormatString = valueFormat; + } + SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + time = new Time(parsedDate.getTime()); + } + } else { + final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); + LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter); + time = Time.valueOf(parsedTime); + } + + stmt.setTime(parameterIndex, time); break; case Types.TIMESTAMP: long lTimestamp=0L; http://git-wip-us.apache.org/repos/asf/nifi/blob/03bff7c9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 2d0491b..c5c9a4d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -452,6 +452,77 @@ public class TestPutSQL { } @Test + public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String dateStr = "2002-02-02"; + final String timeStr = "12:02:02"; + + Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", timeStr); + attributes.put("sql.args.1.format", "ISO_LOCAL_TIME"); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", dateStr); + attributes.put("sql.args.2.format", "ISO_LOCAL_DATE"); + + runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", "68522000"); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", "1012633200000"); + + runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes); + + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", "120202000"); + attributes.put("sql.args.1.format", "HHmmssSSS"); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", "20020202"); + attributes.put("sql.args.2.format", "yyyyMMdd"); + + runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, ?, ?)".getBytes(), attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(68522000L, rs.getTime(2).getTime()); + assertEquals(1012633200000L, rs.getDate(3).getTime()); + + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals(68522000L, rs.getTime(2).getTime()); + assertEquals(1012633200000L, rs.getDate(3).getTime()); + + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertEquals(68522000L, rs.getTime(2).getTime()); + assertEquals(1012633200000L, rs.getDate(3).getTime()); + + assertFalse(rs.next()); + } + } + } + + @Test public void testBitType() throws SQLException, InitializationException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); try (final Connection conn = service.getConnection()) { @@ -569,6 +640,91 @@ public class TestPutSQL { } @Test + public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String arg2TS = "00:01:01"; + final String art3TS = "12:02:02"; + final String timeFormatString = "HH:mm:ss"; + SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); + java.util.Date parsedDate = dateFormat.parse(arg2TS); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime())); + + attributes.put("sql.args.2.type", String.valueOf(Types.TIME)); + attributes.put("sql.args.2.value", art3TS); + attributes.put("sql.args.2.format", timeFormatString); + + runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(arg2TS, rs.getString(2)); + assertEquals(art3TS, rs.getString(3)); + assertFalse(rs.next()); + } + } + } + + @Test + public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String arg2TS = "2001-01-01"; + final String art3TS = "2002-02-02"; + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + java.util.Date parsedDate = dateFormat.parse(arg2TS); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime())); + attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); + attributes.put("sql.args.2.value", art3TS); + + runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM DATETESTS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(arg2TS, rs.getString(2)); + assertEquals(art3TS, rs.getString(3)); + assertFalse(rs.next()); + } + } + } + + @Test public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); try (final Connection conn = service.getConnection()) {
