This is an automated email from the ASF dual-hosted git repository.
dstiggy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new cb2117f8e6 NIFI-13579 Improved Timestamp Zone Offset Formatting and
Parsing
cb2117f8e6 is described below
commit cb2117f8e6c8a5bfff52ff43d5d47fb5a725546c
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Oct 9 01:46:14 2024 -0500
NIFI-13579 Improved Timestamp Zone Offset Formatting and Parsing
- Improved Timestamp to String formatting to support patterns with zone
offsets
- Improved String to Timestamp parsing to support adjusted hours and
minutes when zone offset is included
---
.../field/ObjectLocalDateTimeFieldConverter.java | 22 +++-
.../record/field/ObjectStringFieldConverter.java | 9 +-
.../field/ObjectStringFieldConverterTest.java | 135 +++++++++++++++++++++
.../field/ObjectTimestampFieldConverterTest.java | 48 ++++++++
.../iceberg/TestIcebergRecordConverter.java | 9 +-
.../processors/standard/TestValidateRecord.java | 71 +++++++++++
6 files changed, 285 insertions(+), 9 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
index 6408949537..37d8fbe3ad 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
@@ -21,8 +21,10 @@ import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
+import java.time.temporal.TemporalAccessor;
import java.util.Date;
import java.util.Optional;
@@ -70,7 +72,7 @@ class ObjectLocalDateTimeFieldConverter implements
FieldConverter<Object, LocalD
if (pattern.isPresent()) {
final DateTimeFormatter formatter =
DateTimeFormatterRegistry.getDateTimeFormatter(pattern.get());
try {
- return LocalDateTime.parse(string, formatter);
+ return parseLocalDateTime(field, name, string, formatter);
} catch (final DateTimeParseException e) {
return tryParseAsNumber(string, name);
}
@@ -82,6 +84,24 @@ class ObjectLocalDateTimeFieldConverter implements
FieldConverter<Object, LocalD
throw new FieldConversionException(LocalDateTime.class, field, name);
}
+ private LocalDateTime parseLocalDateTime(final Object field, final String
name, final String string, final DateTimeFormatter formatter) {
+ final LocalDateTime parsed;
+
+ // Attempt ZonedDateTime parsing before LocalDateTime to handle zone
offsets
+ final TemporalAccessor resolved = formatter.parseBest(string,
ZonedDateTime::from, LocalDateTime::from);
+ if (resolved instanceof ZonedDateTime zonedDateTime) {
+ // Convert Instant to LocalDateTime using system default zone
offset to incorporate adjusted hours and minutes
+ final Instant instant = zonedDateTime.toInstant();
+ parsed = ofInstant(instant);
+ } else if (resolved instanceof LocalDateTime localDateTime) {
+ parsed = localDateTime;
+ } else {
+ throw new FieldConversionException(LocalDateTime.class, field,
name);
+ }
+
+ return parsed;
+ }
+
private LocalDateTime tryParseAsNumber(final String value, final String
fieldName) {
try {
// If decimal, treat as a double and convert to seconds and
nanoseconds.
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectStringFieldConverter.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectStringFieldConverter.java
index 717b81f93e..563e564be7 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectStringFieldConverter.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectStringFieldConverter.java
@@ -26,16 +26,12 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
/**
* Convert Object to String using instanceof evaluation and optional format
pattern for DateTimeFormatter
*/
class ObjectStringFieldConverter implements FieldConverter<Object, String> {
- private static final Map<String, DateTimeFormatter> FORMATTERS = new
ConcurrentHashMap<>();
-
/**
* Convert Object field to String using optional format supported in
DateTimeFormatter
*
@@ -59,7 +55,10 @@ class ObjectStringFieldConverter implements
FieldConverter<Object, String> {
}
final DateTimeFormatter formatter =
DateTimeFormatterRegistry.getDateTimeFormatter(pattern.get());
final LocalDateTime localDateTime = timestamp.toLocalDateTime();
- return formatter.format(localDateTime);
+
+ // Convert LocalDateTime to ZonedDateTime using system default
zone to support offsets in Date Time Formatter
+ final ZonedDateTime dateTime = ZonedDateTime.of(localDateTime,
ZoneId.systemDefault());
+ return formatter.format(dateTime);
}
if (field instanceof java.util.Date date) {
if (pattern.isEmpty()) {
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectStringFieldConverterTest.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectStringFieldConverterTest.java
new file mode 100644
index 0000000000..67f0c4f947
--- /dev/null
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectStringFieldConverterTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.field;
+
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.zone.ZoneRules;
+import java.util.Date;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class ObjectStringFieldConverterTest {
+ private static final ObjectStringFieldConverter CONVERTER = new
ObjectStringFieldConverter();
+
+ private static final String DEFAULT_PATTERN =
RecordFieldType.TIMESTAMP.getDefaultFormat();
+
+ private static final String FIELD_NAME = Timestamp.class.getSimpleName();
+
+ private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00";
+
+ private static final String DATE_TIME_NANOSECONDS_PATTERN = "yyyy-MM-dd
HH:mm:ss.SSSSSSSSS";
+
+ private static final String DATE_TIME_NANOSECONDS = "2000-01-01
12:00:00.123456789";
+
+ private static final String DATE_TIME_ZONE_OFFSET_PATTERN = "yyyy-MM-dd
HH:mm:ssZZZZZ";
+
+ @Test
+ void testConvertFieldNull() {
+ final String string = CONVERTER.convertField(null,
Optional.of(DEFAULT_PATTERN), FIELD_NAME);
+ assertNull(string);
+ }
+
+ @Test
+ void testConvertFieldTimestampDefaultPattern() {
+ final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
+
+ final String string = CONVERTER.convertField(timestamp,
Optional.of(DEFAULT_PATTERN), FIELD_NAME);
+
+ assertEquals(DATE_TIME_DEFAULT, string);
+ }
+
+ @Test
+ void testConvertFieldTimestampNanoseconds() {
+ final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_NANOSECONDS);
+
+ final String string = CONVERTER.convertField(timestamp,
Optional.of(DATE_TIME_NANOSECONDS_PATTERN), FIELD_NAME);
+
+ assertEquals(DATE_TIME_NANOSECONDS, string);
+ }
+
+ @Test
+ void testConvertFieldTimestampEmptyPattern() {
+ final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
+
+ final String string = CONVERTER.convertField(timestamp,
Optional.empty(), FIELD_NAME);
+
+ final String expected = Long.toString(timestamp.getTime());
+ assertEquals(expected, string);
+ }
+
+ @Test
+ void testConvertFieldTimestampZoneOffsetPattern() {
+ final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
+
+ final String string = CONVERTER.convertField(timestamp,
Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
+
+ final String dateTimeZoneOffsetExpected = getDateTimeZoneOffset();
+ assertEquals(dateTimeZoneOffsetExpected, string);
+ }
+
+ @Test
+ void testConvertFieldDateDefaultPattern() {
+ final Date date = new
Date(Timestamp.valueOf(DATE_TIME_DEFAULT).getTime());
+
+ final String string = CONVERTER.convertField(date,
Optional.of(DEFAULT_PATTERN), FIELD_NAME);
+
+ assertEquals(DATE_TIME_DEFAULT, string);
+ }
+
+ @Test
+ void testConvertFieldDateEmptyPattern() {
+ final Date date = new
Date(Timestamp.valueOf(DATE_TIME_DEFAULT).getTime());
+
+ final String string = CONVERTER.convertField(date, Optional.empty(),
FIELD_NAME);
+
+ final String expected = Long.toString(date.getTime());
+ assertEquals(expected, string);
+ }
+
+ @Test
+ void testConvertFieldDateZoneOffsetPattern() {
+ final Timestamp inputTimestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
+ final long inputTime = inputTimestamp.getTime();
+ final Date date = new Date(inputTime);
+
+ final String string = CONVERTER.convertField(date,
Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
+
+ final String dateTimeZoneOffsetExpected = getDateTimeZoneOffset();
+ assertEquals(dateTimeZoneOffsetExpected, string);
+ }
+
+ private String getDateTimeZoneOffset() {
+ final Timestamp inputTimestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
+ final LocalDateTime inputLocalDateTime =
inputTimestamp.toLocalDateTime();
+
+ final ZoneId systemDefaultZoneId = ZoneOffset.systemDefault();
+ final ZoneRules zoneRules = systemDefaultZoneId.getRules();
+ final ZoneOffset inputZoneOffset =
zoneRules.getOffset(inputLocalDateTime);
+ final String inputZoneOffsetId = inputZoneOffset.getId();
+
+ // Get Date Time with Zone Offset from current system configuration
+ return DATE_TIME_DEFAULT + inputZoneOffsetId;
+ }
+}
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
index cd69236c4c..942049d8ec 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java
@@ -20,6 +20,12 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.zone.ZoneRules;
import java.util.Date;
import java.util.Optional;
@@ -39,6 +45,10 @@ public class ObjectTimestampFieldConverterTest {
private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00";
+ private static final String DATE_TIME_ZONE_OFFSET_PATTERN = "yyyy-MM-dd
HH:mm:ssZZZZZ";
+
+ private static final String DATE_TIME_UTC_OFFSET = "2000-01-01
12:00:00+00:00";
+
private static final Optional<String> DATE_TIME_NANOSECONDS_PATTERN =
Optional.of("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
private static final String DATE_TIME_NANOSECONDS = "2000-01-01
12:00:00.123456789";
@@ -110,4 +120,42 @@ public class ObjectTimestampFieldConverterTest {
final FieldConversionException exception =
assertThrows(FieldConversionException.class, () ->
CONVERTER.convertField(DATE_TIME_DEFAULT, DATE_TIME_NANOSECONDS_PATTERN,
FIELD_NAME));
assertTrue(exception.getMessage().contains(DATE_TIME_DEFAULT));
}
+
+ @Test
+ public void testConvertFieldStringFormatCustomZoneOffsetSystemDefault() {
+ final String dateTimeZoneOffset = getDateTimeZoneOffset();
+ final Timestamp timestamp = CONVERTER.convertField(dateTimeZoneOffset,
Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
+ final Timestamp expected = Timestamp.valueOf(DATE_TIME_DEFAULT);
+ assertEquals(expected, timestamp);
+ }
+
+ @Test
+ public void
testConvertFieldStringFormatCustomZoneOffsetCoordinatedUniversalTime() {
+ final Timestamp timestamp =
CONVERTER.convertField(DATE_TIME_UTC_OFFSET,
Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
+ final Timestamp expected = getDateTimeCoordinatedUniversalTime();
+ assertEquals(expected, timestamp);
+ }
+
+ private Timestamp getDateTimeCoordinatedUniversalTime() {
+ final Timestamp dateTime = Timestamp.valueOf(DATE_TIME_DEFAULT);
+ final LocalDateTime localDateTime = dateTime.toLocalDateTime();
+
+ final ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime,
ZoneOffset.UTC);
+ final Instant instant = zonedDateTime.toInstant();
+ final LocalDateTime localDateTimeAdjusted =
LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+ return Timestamp.valueOf(localDateTimeAdjusted);
+ }
+
+ private String getDateTimeZoneOffset() {
+ final Timestamp inputTimestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
+ final LocalDateTime inputLocalDateTime =
inputTimestamp.toLocalDateTime();
+
+ final ZoneId systemDefaultZoneId = ZoneOffset.systemDefault();
+ final ZoneRules zoneRules = systemDefaultZoneId.getRules();
+ final ZoneOffset inputZoneOffset =
zoneRules.getOffset(inputLocalDateTime);
+ final String inputZoneOffsetId = inputZoneOffset.getId();
+
+ // Get Date Time with Zone Offset from current system configuration
+ return DATE_TIME_DEFAULT + inputZoneOffsetId;
+ }
}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index 4423541b24..4db691b072 100644
---
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -69,7 +69,9 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
+import java.time.ZoneId;
import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -512,8 +514,8 @@ public class TestIcebergRecordConverter {
values.put("binary", "hello".getBytes());
values.put("date", "2017-04-04");
values.put("time", "14:20:33.000");
- values.put("timestamp", "2017-04-04 14:20:33.789-0500");
- values.put("timestampTz", "2017-04-04 14:20:33.789-0500");
+ values.put("timestamp", Timestamp.valueOf(LOCAL_DATE_TIME));
+ values.put("timestampTz", Timestamp.valueOf(LOCAL_DATE_TIME));
values.put("uuid", "0000-00-00-00-000000");
values.put("choice", "10");
@@ -764,7 +766,8 @@ public class TestIcebergRecordConverter {
assertEquals(results.size(), 1);
GenericRecord resultRecord = results.getFirst();
- OffsetDateTime offsetDateTime = OffsetDateTime.of(LOCAL_DATE_TIME,
ZoneOffset.ofHours(-5));
+ final ZonedDateTime zonedDateTime = ZonedDateTime.of(LOCAL_DATE_TIME,
ZoneId.systemDefault());
+ OffsetDateTime offsetDateTime = zonedDateTime.toOffsetDateTime();
assertEquals("123", resultRecord.get(0, String.class));
assertEquals(Integer.valueOf(8), resultRecord.get(1, Integer.class));
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 2be36a6bba..421d0628a4 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -53,6 +53,11 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.zone.ZoneRules;
import java.util.Map;
import java.util.Optional;
@@ -577,6 +582,64 @@ public class TestValidateRecord {
validFlowFileInferredSchema.assertContentEquals(new
File("src/test/resources/TestValidateRecord/timestamp.json"));
}
+ @Test
+ public void testValidateCsvTimestampZoneOffset() throws
InitializationException {
+ final String recordSchema = """
+ {
+ "name": "ts",
+ "namespace": "nifi",
+ "type": "record",
+ "fields": [{
+ "name": "created",
+ "type": {
+ "type": "long", "logicalType": "timestamp-millis"
+ }
+ }]
+ }
+ """;
+
+ final String inputDateTime = "2020-01-01 12:00:00";
+ final Timestamp inputTimestamp = Timestamp.valueOf(inputDateTime);
+ final LocalDateTime inputLocalDateTime =
inputTimestamp.toLocalDateTime();
+ final String systemZoneOffsetId =
getSystemZoneOffsetId(inputLocalDateTime);
+
+ final String serializedRecord = inputDateTime + systemZoneOffsetId;
+ final String timestampFormat = "yyyy-MM-dd HH:mm:ssZZZZZ";
+
+ final String readerServiceId = "reader";
+ final String writerServiceId = "writer";
+
+ final CSVReader recordReader = new CSVReader();
+ runner.addControllerService(readerServiceId, recordReader);
+ runner.setProperty(recordReader,
ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(recordReader, SCHEMA_TEXT, recordSchema);
+ runner.setProperty(recordReader, DateTimeUtils.TIMESTAMP_FORMAT,
timestampFormat);
+ runner.enableControllerService(recordReader);
+
+ final CSVRecordSetWriter recordSetWriter = new CSVRecordSetWriter();
+ runner.addControllerService(writerServiceId, recordSetWriter);
+ runner.setProperty(recordSetWriter, "Schema Write Strategy",
JsonRecordSetWriter.AVRO_SCHEMA_ATTRIBUTE.getValue());
+ runner.setProperty(recordSetWriter, DateTimeUtils.TIMESTAMP_FORMAT,
timestampFormat);
+ runner.setProperty(recordSetWriter, CSVUtils.INCLUDE_HEADER_LINE,
Boolean.FALSE.toString());
+ runner.enableControllerService(recordSetWriter);
+
+ runner.setProperty(ValidateRecord.RECORD_READER, readerServiceId);
+ runner.setProperty(ValidateRecord.RECORD_WRITER, writerServiceId);
+ runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY,
SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(SCHEMA_TEXT, recordSchema);
+ runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER,
writerServiceId);
+ runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING,
Boolean.TRUE.toString());
+
+ runner.enqueue(serializedRecord);
+ runner.run();
+
+ runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+ final MockFlowFile validFlowFile =
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
+ final String flowFileContent = validFlowFile.getContent().trim();
+
+ assertEquals(serializedRecord, flowFileContent);
+ }
+
@Test
public void testValidateMaps() throws IOException,
InitializationException, MalformedRecordException {
final String validateSchema =
Files.readString(Paths.get("src/test/resources/TestValidateRecord/int-maps-schema.avsc"));
@@ -918,4 +981,12 @@ public class TestValidateRecord {
invalidFlowFile.assertAttributeEquals("valDetails", "Records in this
FlowFile were invalid for the following reasons: ; "
+ "The following 1 fields had values whose type did not match
the schema: [/id]");
}
+
+ private String getSystemZoneOffsetId(final LocalDateTime
inputLocalDateTime) {
+ final ZoneId systemDefaultZoneId = ZoneOffset.systemDefault();
+ final ZoneRules zoneRules = systemDefaultZoneId.getRules();
+
+ final ZoneOffset systemZoneOffset =
zoneRules.getOffset(inputLocalDateTime);
+ return systemZoneOffset.getId();
+ }
}