This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b6d044853c NIFI-12885 Added Record Methods for Local and Offset Dates
(#8502)
b6d044853c is described below
commit b6d044853cfcf568c246362bb5a337ea3bf2d951
Author: knguyen1 <[email protected]>
AuthorDate: Thu Apr 25 00:23:54 2024 -0400
NIFI-12885 Added Record Methods for Local and Offset Dates (#8502)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/serialization/record/MapRecord.java | 24 +++-
.../apache/nifi/serialization/record/Record.java | 11 +-
.../record/ResultSetRecordSetTest.java | 5 +-
.../nifi/serialization/record/TestMapRecord.java | 126 +++++++++++++++++----
.../serialization/record/util/TriFunction.java | 32 ++++++
.../nifi/processors/standard/TestQueryRecord.java | 2 +-
6 files changed, 167 insertions(+), 33 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index ee5a5991d0..359d574990 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -32,11 +32,12 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -279,10 +280,23 @@ public class MapRecord implements Record {
}
@Override
- public Date getAsDate(final String fieldName, final String format) {
- final FieldConverter<Object, LocalDate> converter =
StandardFieldConverterRegistry.getRegistry().getFieldConverter(LocalDate.class);
- final LocalDate localDate =
converter.convertField(getValue(fieldName), Optional.ofNullable(format),
fieldName);
- return localDate == null ? null : java.sql.Date.valueOf(localDate);
+ public LocalDate getAsLocalDate(final String fieldName, final String
format) {
+ return convertFieldToDateTime(LocalDate.class, fieldName, format);
+ }
+
+ @Override
+ public LocalDateTime getAsLocalDateTime(String fieldName, String format) {
+ return convertFieldToDateTime(LocalDateTime.class, fieldName, format);
+ }
+
+ @Override
+ public OffsetDateTime getAsOffsetDateTime(final String fieldName, final
String format) {
+ return convertFieldToDateTime(OffsetDateTime.class, fieldName, format);
+ }
+
+ private <T> T convertFieldToDateTime(Class<T> clazz, String fieldName,
String format) {
+ final FieldConverter<Object, T> converter =
StandardFieldConverterRegistry.getRegistry().getFieldConverter(clazz);
+ return converter.convertField(getValue(fieldName),
Optional.ofNullable(format), fieldName);
}
@Override
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
index 8aebe8275e..0133b0175c 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -17,10 +17,13 @@
package org.apache.nifi.serialization.record;
-import java.util.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+
import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
public interface Record {
@@ -107,7 +110,11 @@ public interface Record {
Boolean getAsBoolean(String fieldName);
- Date getAsDate(String fieldName, String format);
+ LocalDate getAsLocalDate(String fieldName, String format);
+
+ LocalDateTime getAsLocalDateTime(String fieldName, String format);
+
+ OffsetDateTime getAsOffsetDateTime(String fieldName, String format);
Object[] getAsArray(String fieldName);
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
index fa594a85a6..b4fe5209dd 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -275,7 +275,6 @@ public class ResultSetRecordSetTest {
final Boolean bitValue = Boolean.FALSE;
final Boolean booleanValue = Boolean.TRUE;
final Character charValue = 'c';
- final Date dateValue = Date.valueOf(testDate);
final Timestamp timestampValue = Timestamp.valueOf(testDateTime);
final Integer integerValue = 1234567890;
final Double doubleValue = 0.12;
@@ -295,7 +294,7 @@ public class ResultSetRecordSetTest {
when(resultSet.getObject(COLUMN_NAME_BIT)).thenReturn(bitValue);
when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
- when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
+ when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(testDate);
when(resultSet.getTimestamp(COLUMN_NAME_TIMESTAMP)).thenReturn(timestampValue);
when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
@@ -319,7 +318,7 @@ public class ResultSetRecordSetTest {
assertEquals(booleanValue, record.getAsBoolean(COLUMN_NAME_BOOLEAN));
assertEquals(charValue, record.getValue(COLUMN_NAME_CHAR));
- assertEquals(dateValue, record.getAsDate(COLUMN_NAME_DATE, null));
+ assertEquals(testDate, record.getAsLocalDate(COLUMN_NAME_DATE, null));
final Object timestampObject = record.getValue(COLUMN_NAME_TIMESTAMP);
assertEquals(timestampValue, timestampObject);
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
index b343ab0647..58a0b53061 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
@@ -21,8 +21,16 @@ import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.TriFunction;
import org.junit.jupiter.api.Test;
-
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -30,6 +38,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -38,7 +47,11 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestMapRecord {
+class TestMapRecord {
+
+ private static final String ISO_LOCAL_DATE = "yyyy-MM-dd";
+ private static final String ISO_LOCAL_DATE_TIME =
"yyyy-MM-dd'T'HH:mm:ss.SSS";
+ private static final String ISO_OFFSET_DATE_TIME =
"yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final List<RecordField> STRING_NUMBER_FIELDS = List.of(
new RecordField("string", RecordFieldType.STRING.getDataType()),
@@ -47,7 +60,7 @@ public class TestMapRecord {
@Test
- public void testRenameClearsSerializedForm() {
+ void testRenameClearsSerializedForm() {
final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
@@ -58,7 +71,7 @@ public class TestMapRecord {
}
@Test
- public void testRemoveClearsSerializedForm() {
+ void testRemoveClearsSerializedForm() {
final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
@@ -69,7 +82,7 @@ public class TestMapRecord {
}
@Test
- public void testRenameRemoveInvalidFieldsToNotClearSerializedForm() {
+ void testRenameRemoveInvalidFieldsToNotClearSerializedForm() {
final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
@@ -85,7 +98,7 @@ public class TestMapRecord {
}
@Test
- public void testIncorporateInactiveFieldsWithUpdate() {
+ void testIncorporateInactiveFieldsWithUpdate() {
final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
@@ -105,7 +118,7 @@ public class TestMapRecord {
}
@Test
- public void testIncorporateInactiveFieldsWithConflict() {
+ void testIncorporateInactiveFieldsWithConflict() {
final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
@@ -127,7 +140,7 @@ public class TestMapRecord {
}
@Test
- public void testDefaultValue() {
+ void testDefaultValue() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("noDefault",
RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("defaultOfHello",
RecordFieldType.STRING.getDataType(), "hello"));
@@ -141,7 +154,7 @@ public class TestMapRecord {
}
@Test
- public void testDefaultValueInGivenField() {
+ void testDefaultValueInGivenField() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("noDefault",
RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("defaultOfHello",
RecordFieldType.STRING.getDataType(), "hello"));
@@ -158,7 +171,7 @@ public class TestMapRecord {
}
@Test
- public void testIllegalDefaultValue() {
+ void testIllegalDefaultValue() {
new RecordField("hello", RecordFieldType.STRING.getDataType(), 84);
new RecordField("hello", RecordFieldType.STRING.getDataType(),
(Object) null);
new RecordField("hello", RecordFieldType.INT.getDataType(), 84);
@@ -168,15 +181,11 @@ public class TestMapRecord {
}
private Set<String> set(final String... values) {
- final Set<String> set = new LinkedHashSet<>();
- for (final String value : values) {
- set.add(value);
- }
- return set;
+ return new LinkedHashSet<>(Arrays.asList(values));
}
@Test
- public void testAliasOneValue() {
+ void testAliasOneValue() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo",
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@@ -191,7 +200,7 @@ public class TestMapRecord {
}
@Test
- public void testAliasConflictingValues() {
+ void testAliasConflictingValues() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo",
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@@ -207,7 +216,7 @@ public class TestMapRecord {
}
@Test
- public void testAliasConflictingAliasValues() {
+ void testAliasConflictingAliasValues() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo",
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@@ -223,7 +232,7 @@ public class TestMapRecord {
}
@Test
- public void testAliasInGivenField() {
+ void testAliasInGivenField() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo",
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@@ -246,7 +255,7 @@ public class TestMapRecord {
@Test
- public void testDefaultValueWithAliasValue() {
+ void testDefaultValueWithAliasValue() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo",
RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
@@ -262,7 +271,7 @@ public class TestMapRecord {
}
@Test
- public void testDefaultValueWithAliasesDefined() {
+ void testDefaultValueWithAliasesDefined() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo",
RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
@@ -275,7 +284,7 @@ public class TestMapRecord {
}
@Test
- public void testNestedSchema() {
+ void testNestedSchema() {
final String FOO_TEST_VAL = "test!";
final String NESTED_RECORD_VALUE = "Hello, world!";
@@ -323,4 +332,77 @@ public class TestMapRecord {
}
}
+ @ParameterizedTest
+ @MethodSource("provideLocalDates")
+ void testGettingLocalDate(final String input, final String format,
LocalDate expectedDate) {
+ executeDateTimeTest(input, format, expectedDate,
MapRecord::getAsLocalDate);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideLocalDateTimes")
+ void testGettingLocalDateTime(final String input, final String format,
LocalDateTime expectedDateTime) {
+ executeDateTimeTest(input, format, expectedDateTime,
MapRecord::getAsLocalDateTime);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideOffsetDateTimes")
+ void testGettingOffsetDateTime(final String input, final String format,
OffsetDateTime expectedOffsetDateTime) {
+ executeDateTimeTest(input, format, expectedOffsetDateTime,
MapRecord::getAsOffsetDateTime);
+ }
+
+ private <T> void executeDateTimeTest(final String input,
+ final String format,
+ final Object expectedDateTime,
+ TriFunction<MapRecord, String, String, T> dateTimeFunction) {
+ // create a `MapRecord` from the input
+ final List<RecordField> fields = new ArrayList<>();
+ final String timestampFieldName = "timestamp";
+ fields.add(new RecordField(timestampFieldName,
RecordFieldType.TIMESTAMP.getDataType()));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ final HashMap<String, Object> item = new HashMap<>();
+ item.put(timestampFieldName, input);
+ final MapRecord testRecord = new MapRecord(schema, item);
+
+ // apply the datetime function to the record and compare
+ final T actualDateTime = dateTimeFunction.apply(testRecord,
timestampFieldName, format);
+ assertEquals(expectedDateTime, actualDateTime);
+ }
+
+ private static Stream<Arguments> provideLocalDates() {
+ return Stream.of(
+ Arguments.of("2022-01-01", ISO_LOCAL_DATE,
LocalDate.parse("2022-01-01")),
+ Arguments.of("2022-01-01T12:34:56.789", ISO_LOCAL_DATE_TIME,
LocalDate.parse("2022-01-01")),
+ Arguments.of("2017-06-23T01:02:03.456", ISO_LOCAL_DATE_TIME,
LocalDate.parse("2017-06-23")),
+ Arguments.of("2020-02-29T23:59:59.999", ISO_LOCAL_DATE_TIME,
LocalDate.parse("2020-02-29")), // leap year
+ Arguments.of("2024-03-10T02:00:00.000", ISO_LOCAL_DATE_TIME,
LocalDate.parse("2024-03-10")), // DST transition
+ // test minimum and maximum values
+ Arguments.of("0001-01-01T00:00:00.000", ISO_LOCAL_DATE_TIME,
LocalDate.parse("0001-01-01")),
+ Arguments.of("9999-12-31T23:59:59.999", ISO_LOCAL_DATE_TIME,
LocalDate.parse("9999-12-31"))
+ );
+ }
+
+ private static Stream<Arguments> provideLocalDateTimes() {
+ return Stream.of(
+ Arguments.of("2022-01-01T12:34:56.789", ISO_LOCAL_DATE_TIME,
LocalDateTime.parse("2022-01-01T12:34:56.789")),
+ Arguments.of("2017-06-23T01:02:03.456", ISO_LOCAL_DATE_TIME,
LocalDateTime.parse("2017-06-23T01:02:03.456")),
+ Arguments.of("2020-02-29T23:59:59.999", ISO_LOCAL_DATE_TIME,
LocalDateTime.parse("2020-02-29T23:59:59.999")), // leap year
+ Arguments.of("2024-03-10T02:00:00.000", ISO_LOCAL_DATE_TIME,
LocalDateTime.parse("2024-03-10T02:00:00.000")), // DST transition
+ // test minimum and maximum values
+ Arguments.of("0001-01-01T00:00:00.000", ISO_LOCAL_DATE_TIME,
LocalDateTime.parse("0001-01-01T00:00:00.000")),
+ Arguments.of("9999-12-31T23:59:59.999", ISO_LOCAL_DATE_TIME,
LocalDateTime.parse("9999-12-31T23:59:59.999"))
+ );
+ }
+
+ private static Stream<Arguments> provideOffsetDateTimes() {
+ return Stream.of(
+ Arguments.of("2022-01-01T12:34:56.789+00:00",
ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2022-01-01T12:34:56.789+00:00")),
+ Arguments.of("2017-06-23T01:02:03.456+00:00",
ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2017-06-23T01:02:03.456+00:00")),
+ Arguments.of("2020-02-29T23:59:59.999+00:00",
ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2020-02-29T23:59:59.999+00:00")),
// leap year
+ Arguments.of("2024-03-10T02:00:00.000+00:00",
ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2024-03-10T02:00:00.000+00:00")),
// DST transition
+ // test minimum and maximum values
+ Arguments.of("0001-01-01T00:00:00.000+00:00",
ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("0001-01-01T00:00:00.000+00:00")),
+ Arguments.of("9999-12-31T23:59:59.999+00:00",
ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("9999-12-31T23:59:59.999+00:00"))
+ );
+ }
}
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/util/TriFunction.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/util/TriFunction.java
new file mode 100644
index 0000000000..feb8c227d9
--- /dev/null
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/util/TriFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.util;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+@FunctionalInterface
+public interface TriFunction<S, T, U, R> {
+
+ R apply(S s, T t, U u);
+
+ default <V> TriFunction<S, T, U, V> andThen(Function<? super R, ? extends
V> after) {
+ Objects.requireNonNull(after);
+ return (S s, T t, U u) -> after.apply(apply(s, t, u));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 86c1dca4df..308bd2858f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -135,7 +135,7 @@ public class TestQueryRecord {
assertArrayEquals(new String[] { "red", "green"}, (Object[])
output.getValue("colors"));
assertArrayEquals(new String[] { "John Doe", "Jane Doe"}, (Object[])
output.getValue("names"));
- assertEquals(java.sql.Date.valueOf(ISO_DATE),
output.getAsDate("joinTime", ISO_DATE_FORMAT));
+ assertEquals(java.time.LocalDate.parse(ISO_DATE),
output.getAsLocalDate("joinTime", ISO_DATE_FORMAT));
assertEquals(Double.valueOf(180.8D), output.getAsDouble("weight"));
}