This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3411251 NIFI-6640 - UNION/CHOICE types not handled correctly 3
important changes: 1. FieldTypeInference had a bug when dealing with multiple
datatypes for the same field where some (but not all) were in a
wider-than-the-other relationship. Before: Some datatypes could be lost.
String was wider than any other. After: Consistent behaviour. String is NOT
wider than any other. 2. Choosing a datatype for a value from a ChoiceDataType:
Before it chose the first compatible datatyp [...]
3411251 is described below
commit 34112519c2dde19d704ef624e62e51b399cf1ce7
Author: Tamas Palfy <[email protected]>
AuthorDate: Fri Sep 13 16:39:24 2019 +0200
NIFI-6640 - UNION/CHOICE types not handled correctly
3 important changes:
1. FieldTypeInference had a bug when dealing with multiple datatypes for
the same field where some (but not all) were in a wider-than-the-other
relationship.
Before: Some datatypes could be lost. String was wider than any other.
After: Consistent behaviour. String is NOT wider than any other.
2. Choosing a datatype for a value from a ChoiceDataType:
Before it chose the first compatible datatype as the basis of conversion.
After change it tries to find the most suitable datatype.
3. Conversion of a value of avro union type:
Before it chose the first compatible datatype as the basis of conversion.
After change it tries to find the most suitable datatype.
Change: In the RecordFieldType enum moved TIMESTAMP ahead of DATE.
This closes #3724.
Signed-off-by: Mark Payne <[email protected]>
---
.../nifi/serialization/record/RecordFieldType.java | 10 +-
.../serialization/record/util/DataTypeUtils.java | 91 ++++-
.../serialization/record/TestDataTypeUtils.java | 224 ++++++++++++
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 11 +
.../org/apache/nifi/avro/TestAvroTypeUtil.java | 53 +++
.../nifi-standard-processors/pom.xml | 8 +
.../processors/standard/AbstractConversionIT.java | 388 +++++++++++++++++++++
.../standard/ConversionWithExplicitSchemaIT.java | 88 +++++
.../standard/ConversionWithSchemaInferenceIT.java | 51 +++
.../TestConversions/data.int_float_string.json | 22 ++
.../data.int_float_string.with_header.csv | 8 +
.../data.int_float_string.with_schema.avro | Bin 0 -> 302 bytes
.../data.int_float_string.with_schema.json.to.avro | Bin 0 -> 322 bytes
.../data.int_float_string.without_header.csv | 7 +
.../data.int_float_string.without_schema.avro | Bin 0 -> 51 bytes
.../TestConversions/data.int_float_string.xml | 31 ++
.../resources/TestConversions/explicit.schema.json | 23 ++
.../nifi/schema/inference/FieldTypeInference.java | 35 +-
.../apache/nifi/csv/TestCSVSchemaInference.java | 16 +-
.../org/apache/nifi/csv/TestWriteCSVResult.java | 2 +-
.../json/TestInferJsonSchemaAccessStrategy.java | 12 +-
.../schema/inference/TestFieldTypeInference.java | 182 ++++++++++
.../org/apache/nifi/xml/TestInferXmlSchema.java | 8 +-
.../src/test/resources/json/output/dataTypes.json | 2 +-
24 files changed, 1233 insertions(+), 39 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index de9aa58..413c128 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -73,6 +73,11 @@ public enum RecordFieldType {
DOUBLE("double", FLOAT),
/**
+ * A timestamp field type. Fields of this type use a {@code
java.sql.Timestamp} value.
+ */
+ TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
+
+ /**
* A date field type. Fields of this type use a {@code java.sql.Date}
value.
*/
DATE("date", "yyyy-MM-dd"),
@@ -83,11 +88,6 @@ public enum RecordFieldType {
TIME("time", "HH:mm:ss"),
/**
- * A timestamp field type. Fields of this type use a {@code
java.sql.Timestamp} value.
- */
- TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
-
- /**
* A char field type. Fields of this type use a {@code char} value.
*/
CHAR("char"),
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 0686dcf..308cafa 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.Reader;
+import java.lang.reflect.Array;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -47,16 +48,21 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import java.util.TimeZone;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -225,17 +231,75 @@ public class DataTypeUtils {
}
public static DataType chooseDataType(final Object value, final
ChoiceDataType choiceType) {
- for (final DataType subType : choiceType.getPossibleSubTypes()) {
- if (isCompatibleDataType(value, subType)) {
- if (subType.getFieldType() == RecordFieldType.CHOICE) {
- return chooseDataType(value, (ChoiceDataType) subType);
+ Queue<DataType> possibleSubTypes = new
LinkedList<>(choiceType.getPossibleSubTypes());
+ List<DataType> compatibleSimpleSubTypes = new ArrayList<>();
+
+ DataType subType;
+ while ((subType = possibleSubTypes.poll()) != null) {
+ if (subType instanceof ChoiceDataType) {
+ possibleSubTypes.addAll(((ChoiceDataType)
subType).getPossibleSubTypes());
+ } else {
+ if (isCompatibleDataType(value, subType)) {
+ compatibleSimpleSubTypes.add(subType);
+ }
+ }
+ }
+
+ int nrOfCompatibleSimpleSubTypes = compatibleSimpleSubTypes.size();
+
+ final DataType chosenSimpleType;
+ if (nrOfCompatibleSimpleSubTypes == 0) {
+ chosenSimpleType = null;
+ } else if (nrOfCompatibleSimpleSubTypes == 1) {
+ chosenSimpleType = compatibleSimpleSubTypes.get(0);
+ } else {
+ chosenSimpleType = findMostSuitableType(value,
compatibleSimpleSubTypes, Function.identity())
+ .orElse(compatibleSimpleSubTypes.get(0));
+ }
+
+ return chosenSimpleType;
+ }
+
+ public static <T> Optional<T> findMostSuitableType(Object value, List<T>
types, Function<T, DataType> dataTypeMapper) {
+ if (value instanceof String) {
+ return findMostSuitableTypeByStringValue((String) value, types,
dataTypeMapper);
+ } else {
+ DataType inferredDataType = inferDataType(value, null);
+
+ if (inferredDataType != null &&
!inferredDataType.getFieldType().equals(RecordFieldType.STRING)) {
+ for (T type : types) {
+ if (inferredDataType.equals(dataTypeMapper.apply(type))) {
+ return Optional.of(type);
+ }
}
- return subType;
+ for (T type : types) {
+ if (getWiderType(dataTypeMapper.apply(type),
inferredDataType).isPresent()) {
+ return Optional.of(type);
+ }
+ }
}
}
- return null;
+ return Optional.empty();
+ }
+
+ public static <T> Optional<T> findMostSuitableTypeByStringValue(String
valueAsString, List<T> types, Function<T, DataType> dataTypeMapper) {
+ // Sorting based on the RecordFieldType enum ordering looks
appropriate here as we want simpler types
+ // first and the enum's ordering seems to reflect that
+ Collections.sort(types, Comparator.comparing(type ->
dataTypeMapper.apply(type).getFieldType()));
+
+ for (T type : types) {
+ try {
+ if (isCompatibleDataType(valueAsString,
dataTypeMapper.apply(type))) {
+ return Optional.of(type);
+ }
+ } catch (Exception e) {
+ logger.error("Exception thrown while checking if '" +
valueAsString + "' is compatible with '" + type + "'", e);
+ }
+ }
+
+ return Optional.empty();
}
public static Record toRecord(final Object value, final RecordSchema
recordSchema, final String fieldName) {
@@ -440,12 +504,12 @@ public class DataTypeUtils {
// final DataType elementDataType = inferDataType(valueFromMap,
RecordFieldType.STRING.getDataType());
// return RecordFieldType.MAP.getMapDataType(elementDataType);
}
- if (value instanceof Object[]) {
- final Object[] array = (Object[]) value;
-
+ if (value.getClass().isArray()) {
DataType mergedDataType = null;
- for (final Object arrayValue : array) {
- final DataType inferredDataType = inferDataType(arrayValue,
RecordFieldType.STRING.getDataType());
+
+ int length = Array.getLength(value);
+ for(int index = 0; index < length; index++) {
+ final DataType inferredDataType =
inferDataType(Array.get(value, index), RecordFieldType.STRING.getDataType());
mergedDataType = mergeDataTypes(mergedDataType,
inferredDataType);
}
@@ -1545,7 +1609,10 @@ public class DataTypeUtils {
possibleTypes.add(otherDataType);
}
- return RecordFieldType.CHOICE.getChoiceDataType(new
ArrayList<>(possibleTypes));
+ ArrayList<DataType> possibleChildTypes = new
ArrayList<>(possibleTypes);
+ Collections.sort(possibleChildTypes,
Comparator.comparing(DataType::getFieldType));
+
+ return
RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
}
}
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index 89a0490..30b2a60 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.junit.Test;
@@ -27,10 +28,17 @@ import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -324,4 +332,220 @@ public class TestDataTypeUtils {
}
assertNotNull(e);
}
+
+ @Test
+ public void
testFindMostSuitableTypeByStringValueShouldReturnEvenWhenOneTypeThrowsException()
{
+ String valueAsString = "value";
+
+ String nonMatchingType = "nonMatchingType";
+ String throwsExceptionType = "throwsExceptionType";
+ String matchingType = "matchingType";
+
+ List<String> types = Arrays.asList(
+ nonMatchingType,
+ throwsExceptionType,
+ matchingType
+ );
+ Optional<String> expected = Optional.of(matchingType);
+
+ AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+
+ Function<String, DataType> dataTypeMapper = type -> {
+ if (type.equals(nonMatchingType)) {
+ return RecordFieldType.BOOLEAN.getDataType();
+ } else if (type.equals(throwsExceptionType)) {
+ return new DataType(RecordFieldType.DATE, null) {
+ @Override
+ public String getFormat() {
+ exceptionThrown.set(true);
+ throw new RuntimeException("maching error");
+ }
+ };
+ } else if (type.equals(matchingType)) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ return null;
+ };
+
+ Optional<String> actual =
DataTypeUtils.findMostSuitableTypeByStringValue(valueAsString, types,
dataTypeMapper);
+ assertTrue("Exception not thrown during test as intended.",
exceptionThrown.get());
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testChooseDataTypeWhenInt_vs_INT_FLOAT_ThenShouldReturnINT() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.FLOAT.getDataType()
+ );
+
+ Object value = 1;
+ DataType expected = RecordFieldType.INT.getDataType();
+
+ // WHEN
+ // THEN
+ testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+ }
+
+ @Test
+ public void
testChooseDataTypeWhenFloat_vs_INT_FLOAT_ThenShouldReturnFLOAT() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.FLOAT.getDataType()
+ );
+
+ Object value = 1.5f;
+ DataType expected = RecordFieldType.FLOAT.getDataType();
+
+ // WHEN
+ // THEN
+ testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+ }
+
+ @Test
+ public void
testChooseDataTypeWhenHasChoiceThenShouldReturnSingleMatchingFromChoice() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.DOUBLE.getDataType(),
+ RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.FLOAT.getDataType(),
+ RecordFieldType.STRING.getDataType()
+ )
+ );
+
+ Object value = 1.5f;
+ DataType expected = RecordFieldType.FLOAT.getDataType();
+
+ // WHEN
+ // THEN
+ testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+ }
+
+ private <E> void testChooseDataTypeAlsoReverseTypes(Object value,
List<DataType> dataTypes, DataType expected) {
+ testChooseDataType(dataTypes, value, expected);
+ Collections.reverse(dataTypes);
+ testChooseDataType(dataTypes, value, expected);
+ }
+
+ private void testChooseDataType(List<DataType> dataTypes, Object value,
DataType expected) {
+ // GIVEN
+ ChoiceDataType choiceDataType = (ChoiceDataType)
RecordFieldType.CHOICE.getChoiceDataType(dataTypes.toArray(new
DataType[dataTypes.size()]));
+
+ // WHEN
+ DataType actual = DataTypeUtils.chooseDataType(value, choiceDataType);
+
+ // THEN
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithBoolean() {
+ testFindMostSuitableType(true, RecordFieldType.BOOLEAN.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithByte() {
+ testFindMostSuitableType(Byte.valueOf((byte) 123),
RecordFieldType.BYTE.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithShort() {
+ testFindMostSuitableType(Short.valueOf((short) 123),
RecordFieldType.SHORT.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithInt() {
+ testFindMostSuitableType(123, RecordFieldType.INT.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithLong() {
+ testFindMostSuitableType(123L, RecordFieldType.LONG.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithBigInt() {
+ testFindMostSuitableType(BigInteger.valueOf(123L),
RecordFieldType.BIGINT.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithFloat() {
+ testFindMostSuitableType(12.3F, RecordFieldType.FLOAT.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithDouble() {
+ testFindMostSuitableType(12.3, RecordFieldType.DOUBLE.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithDate() {
+ testFindMostSuitableType("1111-11-11",
RecordFieldType.DATE.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithTime() {
+ testFindMostSuitableType("11:22:33",
RecordFieldType.TIME.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithTimeStamp() {
+ testFindMostSuitableType("1111-11-11 11:22:33",
RecordFieldType.TIMESTAMP.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithChar() {
+ testFindMostSuitableType('a', RecordFieldType.CHAR.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithStringShouldReturnChar() {
+ testFindMostSuitableType("abc", RecordFieldType.CHAR.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithString() {
+ testFindMostSuitableType("abc", RecordFieldType.STRING.getDataType(),
RecordFieldType.CHAR.getDataType());
+ }
+
+ @Test
+ public void testFindMostSuitableTypeWithArray() {
+ testFindMostSuitableType(new int[]{1, 2, 3},
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()));
+ }
+
+ private void testFindMostSuitableType(Object value, DataType expected,
DataType... filtered) {
+ List<DataType> filteredOutDataTypes =
Arrays.stream(filtered).collect(Collectors.toList());
+
+ // GIVEN
+ List<DataType> unexpectedTypes =
Arrays.stream(RecordFieldType.values())
+ .flatMap(recordFieldType -> {
+ Stream<DataType> dataTypeStream;
+
+ if (RecordFieldType.ARRAY.equals(recordFieldType)) {
+ dataTypeStream =
Arrays.stream(RecordFieldType.values()).map(elementType ->
RecordFieldType.ARRAY.getArrayDataType(elementType.getDataType()));
+ } else {
+ dataTypeStream =
Stream.of(recordFieldType.getDataType());
+ }
+
+ return dataTypeStream;
+ })
+ .filter(dataType -> !dataType.equals(expected))
+ .filter(dataType -> !filteredOutDataTypes.contains(dataType))
+ .collect(Collectors.toList());
+
+ IntStream.rangeClosed(0, unexpectedTypes.size()).forEach(insertIndex
-> {
+ List<DataType> allTypes = new LinkedList<>(unexpectedTypes);
+ allTypes.add(insertIndex, expected);
+
+ // WHEN
+ Optional<DataType> actual =
DataTypeUtils.findMostSuitableType(value, allTypes, Function.identity());
+
+ // THEN
+ assertEquals(Optional.ofNullable(expected), actual);
+ });
+ }
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 2bc95bc..a0eea8b 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -74,6 +74,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
public class AvroTypeUtil {
private static final Logger logger =
LoggerFactory.getLogger(AvroTypeUtil.class);
@@ -877,6 +878,16 @@ public class AvroTypeUtil {
*/
private static Object convertUnionFieldValue(final Object originalValue,
final Schema fieldSchema, final Function<Schema, Object> conversion, final
String fieldName) {
boolean foundNonNull = false;
+
+ Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
+ originalValue,
+ fieldSchema.getTypes().stream().filter(schema ->
schema.getType() != Type.NULL).collect(Collectors.toList()),
+ subSchema -> AvroTypeUtil.determineDataType(subSchema)
+ );
+ if (mostSuitableType.isPresent()) {
+ return conversion.apply(mostSuitableType.get());
+ }
+
for (final Schema subSchema : fieldSchema.getTypes()) {
if (subSchema.getType() == Type.NULL) {
continue;
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 0ecbe25..a89ebe4 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -587,4 +588,56 @@ public class TestAvroTypeUtil {
assertNotNull(((Record)inner).get("Message"));
}
}
+
+ @Test
+ public void
testConvertToAvroObjectWhenIntVSUnion_INT_FLOAT_ThenReturnInt() {
+ // GIVEN
+ List<Schema.Type> schemaTypes = Arrays.asList(
+ Schema.Type.INT,
+ Schema.Type.FLOAT
+ );
+ Integer rawValue = 1;
+
+ Object expected = 1;
+
+ // WHEN
+ // THEN
+ testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue,
schemaTypes);
+ }
+
+ @Test
+ public void
testConvertToAvroObjectWhenFloatVSUnion_INT_FLOAT_ThenReturnFloat() {
+ // GIVEN
+ List<Schema.Type> schemaTypes = Arrays.asList(
+ Schema.Type.INT,
+ Schema.Type.FLOAT
+ );
+ Float rawValue = 1.5f;
+
+ Object expected = 1.5f;
+
+ // WHEN
+ // THEN
+ testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue,
schemaTypes);
+ }
+
+ private void testConvertToAvroObjectAlsoReverseSchemaList(Object expected,
Object rawValue, List<Schema.Type> schemaTypes) {
+ // GIVEN
+ List<Schema> schemaList = schemaTypes.stream()
+ .map(Schema::create)
+ .collect(Collectors.toList());
+
+ // WHEN
+ Object actual = AvroTypeUtil.convertToAvroObject(rawValue,
Schema.createUnion(schemaList), StandardCharsets.UTF_16);
+
+ // THEN
+ assertEquals(expected, actual);
+
+ // WHEN
+ Collections.reverse(schemaList);
+ Object actualAfterReverse = AvroTypeUtil.convertToAvroObject(rawValue,
Schema.createUnion(schemaList), StandardCharsets.UTF_16);
+
+ // THEN
+ assertEquals(expected, actualAfterReverse);
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 0b15be0..ed51c40 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -411,6 +411,14 @@
<exclude>src/test/resources/TestMergeContent/head</exclude>
<exclude>src/test/resources/TestMergeContent/user.avsc</exclude>
<exclude>src/test/resources/TestMergeContent/place.avsc</exclude>
+
<exclude>src/test/resources/TestConversions/data.int_float_string.json</exclude>
+
<exclude>src/test/resources/TestConversions/data.int_float_string.with_header.csv</exclude>
+
<exclude>src/test/resources/TestConversions/data.int_float_string.without_header.csv</exclude>
+
<exclude>src/test/resources/TestConversions/data.int_float_string.xml</exclude>
+
<exclude>src/test/resources/TestConversions/data.int_float_string.with_schema.avro</exclude>
+
<exclude>src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro</exclude>
+
<exclude>src/test/resources/TestConversions/data.int_float_string.without_schema.avro</exclude>
+
<exclude>src/test/resources/TestConversions/explicit.schema.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-1.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/persons.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json</exclude>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java
new file mode 100644
index 0000000..9b458a2
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.avro.AvroReader;
+import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema;
+import org.apache.nifi.avro.AvroRecordSetWriter;
+import org.apache.nifi.csv.CSVReader;
+import org.apache.nifi.csv.CSVRecordSetWriter;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.xml.XMLReader;
+import org.apache.nifi.xml.XMLRecordSetWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractConversionIT {
+ protected RecordReaderFactory reader;
+ protected Consumer<TestRunner> inputHandler;
+ protected Consumer<TestRunner> readerConfigurer;
+
+ protected RecordSetWriterFactory writer;
+ protected Consumer<MockFlowFile> resultHandler;
+ protected Consumer<TestRunner> writerConfigurer;
+
+ @Before
+ public void setUp() throws Exception {
+ reader = null;
+ inputHandler = null;
+ readerConfigurer = null;
+
+ writer = null;
+ resultHandler = null;
+ writerConfigurer = null;
+ }
+
+ @Test
+ public void testCsvToJson() throws Exception {
+ fromCsv(csvPostfix());
+ toJson(jsonPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testCsvToAvro() throws Exception {
+ fromCsv(csvPostfix());
+ toAvro(avroPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testCsvToAvroToCsv() throws Exception {
+ fromCsv(csvPostfix());
+
+ AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+ AvroReader reader2 = new AvroReader();
+
+ toCsv(csvPostfix());
+
+ testChain(writer2, reader2);
+ }
+
+ @Test
+ public void testCsvToXml() throws Exception {
+ fromCsv(csvPostfix());
+ toXml(xmlPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testJsonToCsv() throws Exception {
+ fromJson(jsonPostfix());
+ toCsv(csvPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testJsonToAvro() throws Exception {
+ fromJson(jsonPostfix());
+ toAvro(avroPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testJsonToAvroToJson() throws Exception {
+ fromJson(jsonPostfix());
+
+ AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+ AvroReader reader2 = new AvroReader();
+
+ toJson(jsonPostfix());
+
+ testChain(writer2, reader2);
+ }
+
+ @Test
+ public void testAvroToCsv() throws Exception {
+ fromAvro(avroPostfix());
+ toCsv(csvPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testAvroToJson() throws Exception {
+ fromAvro(avroPostfix());
+ toJson(jsonPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testAvroToXml() throws Exception {
+ fromAvro(avroPostfix());
+ toXml(xmlPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testXmlToCsv() throws Exception {
+ fromXml(xmlPostfix());
+ toCsv(csvPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testXmlToJson() throws Exception {
+ fromXml(xmlPostfix());
+ toJson(jsonPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testXmlToAvro() throws Exception {
+ fromXml(xmlPostfix());
+ toAvro(avroPostfix());
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+
+ @Test
+ public void testXmlToAvroToXml() throws Exception {
+ fromXml(xmlPostfix());
+
+ AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+ AvroReader reader2 = new AvroReader();
+
+ toXml(xmlPostfix());
+
+ testChain(writer2, reader2);
+ }
+
+ abstract protected String csvPostfix();
+
+ abstract protected String jsonPostfix();
+
+ abstract protected String avroPostfix();
+
+ abstract protected String xmlPostfix();
+
+ protected void commonReaderConfiguration(TestRunner testRunner) {
+ }
+
+ protected void commonWriterConfiguration(TestRunner testRunner) {
+ }
+
+ protected void fromCsv(String postfix) {
+ reader = new CSVReader();
+ inputHandler = stringInputHandler(getContent(postfix));
+
+ readerConfigurer = testRunner -> {
+ commonReaderConfiguration(testRunner);
+ };
+ }
+
+ protected void fromJson(String postfix) {
+ reader = new JsonTreeReader();
+ inputHandler = stringInputHandler(getContent(postfix));
+
+ readerConfigurer = testRunner -> {
+ commonReaderConfiguration(testRunner);
+ };
+ }
+
+ protected void fromXml(String postfix) {
+ reader = new XMLReader();
+ inputHandler = stringInputHandler(getContent(postfix));
+
+ readerConfigurer = testRunner -> {
+ commonReaderConfiguration(testRunner);
+ testRunner.setProperty(reader, XMLReader.RECORD_FORMAT,
XMLReader.RECORD_ARRAY);
+ };
+ }
+
+ protected void fromAvro(String postfix) {
+ reader = new AvroReader();
+ inputHandler = byteInputHandler(getByteContent(postfix));
+
+ readerConfigurer = testRunner -> {
+ commonReaderConfiguration(testRunner);
+ };
+ }
+
+ protected void toCsv(String postfix) {
+ writer = new CSVRecordSetWriter();
+ resultHandler = stringOutputHandler(getContent(postfix));
+
+ writerConfigurer = testRunner -> {
+ commonWriterConfiguration(testRunner);
+ };
+ }
+
+ protected void toJson(String postfix) {
+ writer = new JsonRecordSetWriter();
+ resultHandler = stringOutputHandler(getContent(postfix));
+
+ writerConfigurer = testRunner -> {
+ commonWriterConfiguration(testRunner);
+ testRunner.setProperty(writer, "Pretty Print JSON", "true");
+ };
+ }
+
+ protected void toXml(String postfix) {
+ writer = new XMLRecordSetWriter();
+ resultHandler = stringOutputHandler(getContent(postfix));
+
+ writerConfigurer = testRunner -> {
+ commonWriterConfiguration(testRunner);
+ testRunner.setProperty(writer, "pretty_print_xml", "true");
+ testRunner.setProperty(writer, "root_tag_name", "root");
+ testRunner.setProperty(writer, "record_tag_name", "nifiRecord");
+ };
+ }
+
+ protected void toAvro(String postfix) {
+ writer = new AvroRecordSetWriter();
+ resultHandler = mockFlowFile -> {
+ try {
+ List<Map<String, Object>> expected =
getRecords(getByteContent(postfix));
+ List<Map<String, Object>> actual =
getRecords(mockFlowFile.toByteArray());
+
+ assertEquals(expected, actual);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ writerConfigurer = testRunner -> {
+ commonWriterConfiguration(testRunner);
+ };
+ }
+
+ protected Consumer<TestRunner> stringInputHandler(String input) {
+ return testRunner -> testRunner.enqueue(input);
+ }
+
+ protected Consumer<TestRunner> byteInputHandler(byte[] input) {
+ return testRunner -> testRunner.enqueue(input);
+ }
+
+ protected Consumer<MockFlowFile> stringOutputHandler(String expected) {
+ return mockFlowFile -> mockFlowFile.assertContentEquals(expected);
+ }
+
+ protected String getContent(String postfix) {
+ return new String(getByteContent(postfix));
+ }
+
+ protected byte[] getByteContent(String postfix) {
+ try {
+ return
Files.readAllBytes(Paths.get("src/test/resources/TestConversions/data.int_float_string."
+ postfix));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected List<Map<String, Object>> getRecords(byte[] avroData) throws
IOException, MalformedRecordException {
+ try (RecordReader reader = new AvroReaderWithEmbeddedSchema(new
ByteArrayInputStream(avroData));) {
+ return getRecords(reader);
+ }
+ }
+
+ protected List<Map<String, Object>> getRecords(RecordReader reader) throws
IOException, MalformedRecordException {
+ List<Map<String, Object>> records = new ArrayList<>();
+
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ records.add(record.toMap());
+ }
+
+ return records;
+ }
+
+ protected void testChain(RecordSetWriterFactory writer2,
RecordReaderFactory reader2) throws InitializationException {
+ testConversion(reader, readerConfigurer, writer2, null,
+ inputHandler,
+ mockFlowFile -> {
+ try {
+ testConversion(reader2, null, writer, writerConfigurer,
+ testRunner -> testRunner.enqueue(mockFlowFile),
+ resultHandler
+ );
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ protected <R extends RecordReaderFactory, W extends
RecordSetWriterFactory> void testConversion(
+ R reader,
+ Consumer<TestRunner> readerConfigurer,
+ W writer,
+ Consumer<TestRunner> writerConfigurer,
+ Consumer<TestRunner> inputHandler,
+ Consumer<MockFlowFile> resultHandler
+ ) throws InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(ConvertRecord.class);
+
+ String readerId = UUID.randomUUID().toString();
+ String writerId = UUID.randomUUID().toString();
+
+ runner.addControllerService(readerId, reader);
+ runner.addControllerService(writerId, writer);
+
+ Optional.ofNullable(readerConfigurer).ifPresent(_configurer ->
_configurer.accept(runner));
+ Optional.ofNullable(writerConfigurer).ifPresent(_configurer ->
_configurer.accept(runner));
+
+ runner.enableControllerService(reader);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(ConvertRecord.RECORD_READER, readerId);
+ runner.setProperty(ConvertRecord.RECORD_WRITER, writerId);
+
+ inputHandler.accept(runner);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
+
+ resultHandler.accept(flowFile);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java
new file mode 100644
index 0000000..81467e3
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroReaderWithExplicitSchema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.csv.CSVUtils;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Before;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+
+public class ConversionWithExplicitSchemaIT extends AbstractConversionIT {
+ private String schema;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ schema = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestConversions/explicit.schema.json")));
+ }
+
+ @Override
+ protected String csvPostfix() {
+ return "without_header.csv";
+ }
+
+ @Override
+ protected String jsonPostfix() {
+ return "json";
+ }
+
+ @Override
+ protected String avroPostfix() {
+ return "without_schema.avro";
+ }
+
+ @Override
+ protected String xmlPostfix() {
+ return "xml";
+ }
+
+ @Override
+ protected void commonReaderConfiguration(TestRunner testRunner) {
+ testRunner.setProperty(reader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, schema);
+ }
+
+ @Override
+ protected void commonWriterConfiguration(TestRunner testRunner) {
+ testRunner.setProperty(writer, "Schema Write Strategy", "no-schema");
+ testRunner.setProperty(writer, CSVUtils.INCLUDE_HEADER_LINE, "false");
+ }
+
+ @Override
+ protected List<Map<String, Object>> getRecords(byte[] avroData) throws
IOException, MalformedRecordException {
+ Schema avroSchema = new Schema.Parser().parse(schema);
+ RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
+
+ try (RecordReader reader = new AvroReaderWithExplicitSchema(new
ByteArrayInputStream(avroData), recordSchema, avroSchema);) {
+ return getRecords(reader);
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
new file mode 100644
index 0000000..bed820c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+public class ConversionWithSchemaInferenceIT extends AbstractConversionIT {
+ @Override
+ protected String csvPostfix() {
+ return "with_header.csv";
+ }
+
+ @Override
+ protected String jsonPostfix() {
+ return "json";
+ }
+
+ @Override
+ protected String avroPostfix() {
+ return "with_schema.avro";
+ }
+
+ @Override
+ protected String xmlPostfix() {
+ return "xml";
+ }
+
+ @Override
+ public void testJsonToAvro() throws Exception {
+ fromJson(jsonPostfix());
+
+ // JSON schema inference doesn't discern INT and FLOAT but uses LONG
and DOUBLE instead.
+ // So the expected avro is a little bit different as the deserialized
values also end up in
+ // Long and Double objects
+ toAvro("with_schema.json.to.avro");
+
+ testConversion(reader, readerConfigurer, writer, writerConfigurer,
inputHandler, resultHandler);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json
new file mode 100644
index 0000000..1971dcf
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json
@@ -0,0 +1,22 @@
+[ {
+ "Id" : 1,
+ "Int_Float_String" : 3
+}, {
+ "Id" : 2,
+ "Int_Float_String" : 3.75
+}, {
+ "Id" : 3,
+ "Int_Float_String" : 3.85
+}, {
+ "Id" : 4,
+ "Int_Float_String" : 8
+}, {
+ "Id" : 5,
+ "Int_Float_String" : 2.0
+}, {
+ "Id" : 6,
+ "Int_Float_String" : 4.0
+}, {
+ "Id" : 7,
+ "Int_Float_String" : "some_string"
+} ]
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv
new file mode 100644
index 0000000..d997288
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv
@@ -0,0 +1,8 @@
+Id,Int_Float_String
+1,3
+2,3.75
+3,3.85
+4,8
+5,2.0
+6,4.0
+7,some_string
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro
new file mode 100644
index 0000000..3c18077
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro
differ
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
new file mode 100644
index 0000000..4177b05
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
differ
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv
new file mode 100644
index 0000000..65cf365
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv
@@ -0,0 +1,7 @@
+1,3
+2,3.75
+3,3.85
+4,8
+5,2.0
+6,4.0
+7,some_string
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro
new file mode 100644
index 0000000..c800fe6
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro
differ
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml
new file mode 100644
index 0000000..36a9b10
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" ?>
+<root>
+ <nifiRecord>
+ <Id>1</Id>
+ <Int_Float_String>3</Int_Float_String>
+ </nifiRecord>
+ <nifiRecord>
+ <Id>2</Id>
+ <Int_Float_String>3.75</Int_Float_String>
+ </nifiRecord>
+ <nifiRecord>
+ <Id>3</Id>
+ <Int_Float_String>3.85</Int_Float_String>
+ </nifiRecord>
+ <nifiRecord>
+ <Id>4</Id>
+ <Int_Float_String>8</Int_Float_String>
+ </nifiRecord>
+ <nifiRecord>
+ <Id>5</Id>
+ <Int_Float_String>2.0</Int_Float_String>
+ </nifiRecord>
+ <nifiRecord>
+ <Id>6</Id>
+ <Int_Float_String>4.0</Int_Float_String>
+ </nifiRecord>
+ <nifiRecord>
+ <Id>7</Id>
+ <Int_Float_String>some_string</Int_Float_String>
+ </nifiRecord>
+</root>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json
new file mode 100644
index 0000000..5c578b0
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json
@@ -0,0 +1,23 @@
+{
+ "type":"record",
+ "name":"nifiRecord",
+ "namespace":"org.apache.nifi",
+ "fields":[
+ {
+ "name":"Id",
+ "type":[
+ "null",
+ "int"
+ ]
+ },
+ {
+ "name":"Int_Float_String",
+ "type":[
+ "int",
+ "float",
+ "string",
+ "null"
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
index e148baf..1f52cb8 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
@@ -23,6 +23,7 @@ import
org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
public class FieldTypeInference {
@@ -33,7 +34,7 @@ public class FieldTypeInference {
// unique value for the data type, and so this paradigm allows us to avoid
the cost of creating
// and using the HashSet.
private DataType singleDataType = null;
- private Set<DataType> possibleDataTypes;
+ private Set<DataType> possibleDataTypes = new HashSet<>();
public void addPossibleDataType(final DataType dataType) {
if (dataType == null) {
@@ -45,7 +46,7 @@ public class FieldTypeInference {
return;
}
- if (possibleDataTypes == null && singleDataType.equals(dataType)) {
+ if (singleDataType.equals(dataType) ||
possibleDataTypes.contains(dataType)) {
return;
}
@@ -62,36 +63,42 @@ public class FieldTypeInference {
final RecordSchema newSchema = ((RecordDataType)
dataType).getChildSchema();
final RecordSchema mergedSchema =
DataTypeUtils.merge(singleDataTypeSchema, newSchema);
+ possibleDataTypes.remove(singleDataType);
singleDataType =
RecordFieldType.RECORD.getRecordDataType(mergedSchema);
+ possibleDataTypes.add(singleDataType);
return;
}
- if (singleFieldType.isWiderThan(additionalFieldType)) {
- // Assigned type is already wide enough to encompass the given type
- return;
+ if (possibleDataTypes.isEmpty()) {
+ possibleDataTypes.add(singleDataType);
}
- if (additionalFieldType.isWiderThan(singleFieldType)) {
- // The given type is wide enough to encompass the assigned type.
So changed the assigned type to the given type.
- singleDataType = dataType;
- return;
+ for (DataType possibleDataType : possibleDataTypes) {
+ RecordFieldType possibleFieldType =
possibleDataType.getFieldType();
+ if (!possibleFieldType.equals(RecordFieldType.STRING) &&
possibleFieldType.isWiderThan(additionalFieldType)) {
+ return;
+ }
}
- if (possibleDataTypes == null) {
- possibleDataTypes = new HashSet<>();
- possibleDataTypes.add(singleDataType);
+ Iterator<DataType> possibleDataTypeIterator =
possibleDataTypes.iterator();
+ while (possibleDataTypeIterator.hasNext()) {
+ DataType possibleDataType = possibleDataTypeIterator.next();
+ RecordFieldType possibleFieldType =
possibleDataType.getFieldType();
+
+ if (!additionalFieldType.equals(RecordFieldType.STRING) &&
additionalFieldType.isWiderThan(possibleFieldType)) {
+ possibleDataTypeIterator.remove();
+ }
}
possibleDataTypes.add(dataType);
}
-
/**
* Creates a single DataType that represents the field
* @return a single DataType that represents the field
*/
public DataType toDataType() {
- if (possibleDataTypes == null) {
+ if (possibleDataTypes.isEmpty()) {
if (singleDataType == null) {
return DEFAULT_DATA_TYPE;
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
index 9dc8f29..b8d6685 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
@@ -71,7 +71,13 @@ public class TestCSVSchemaInference {
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
schema.getDataType("timestamp").get());
assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"),
schema.getDataType("eventTime").get());
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"),
schema.getDataType("eventDate").get());
- assertEquals(RecordFieldType.STRING.getDataType(),
schema.getDataType("maybeTime").get());
+ assertEquals(
+ RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.TIME.getDataType("HH:mm:ss"),
+ RecordFieldType.STRING.getDataType()
+ ),
+ schema.getDataType("maybeTime").get()
+ );
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"),
schema.getDataType("maybeDate").get());
assertSame(RecordFieldType.INT,
schema.getDataType("parentIds").get().getFieldType());
@@ -118,7 +124,13 @@ public class TestCSVSchemaInference {
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
schema.getDataType("timestamp").get());
assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"),
schema.getDataType("eventTime").get());
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"),
schema.getDataType("eventDate").get());
- assertEquals(RecordFieldType.STRING.getDataType(),
schema.getDataType("maybeTime").get());
+ assertEquals(
+ RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.TIME.getDataType("HH:mm:ss"),
+ RecordFieldType.STRING.getDataType()
+ ),
+ schema.getDataType("maybeTime").get()
+ );
assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"),
schema.getDataType("maybeDate").get());
assertSame(RecordFieldType.INT,
schema.getDataType("parentIds").get().getFieldType());
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index a012ebb..1cfaafd 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -117,7 +117,7 @@ public class TestWriteCSVResult {
final String values = splits[1];
final StringBuilder expectedBuilder = new StringBuilder();
-
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\""
+ dateValue + "\",\"" + timeValue + "\",\"" + timestampValue +
"\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
+
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\""
+ timestampValue + "\",\"" + dateValue + "\",\"" + timeValue +
"\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
final String expectedValues = expectedBuilder.toString();
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
index 4a10356..a4f356f 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
@@ -153,7 +153,10 @@ public class TestInferJsonSchemaAccessStrategy {
// TIME value and a STRING should be inferred as a STRING field
final RecordField maybeTimeField = schema.getField("maybeTime").get();
- assertEquals(RecordFieldType.STRING,
maybeTimeField.getDataType().getFieldType());
+ assertEquals(
+ RecordFieldType.CHOICE.getChoiceDataType().getFieldType(),
+ maybeTimeField.getDataType().getFieldType())
+ ;
// DATE value and a null value should be inferred as a DATE field
final RecordField maybeDateField = schema.getField("maybeDate").get();
@@ -169,7 +172,7 @@ public class TestInferJsonSchemaAccessStrategy {
final RecordSchema schema = inferSchema(file);
assertSame(RecordFieldType.STRING,
schema.getDataType("name").get().getFieldType());
- assertSame(RecordFieldType.STRING,
schema.getDataType("age").get().getFieldType());
+ assertSame(RecordFieldType.CHOICE,
schema.getDataType("age").get().getFieldType());
final DataType valuesDataType = schema.getDataType("values").get();
assertSame(RecordFieldType.CHOICE, valuesDataType.getFieldType());
@@ -178,7 +181,10 @@ public class TestInferJsonSchemaAccessStrategy {
final List<DataType> possibleTypes =
valuesChoiceType.getPossibleSubTypes();
assertEquals(2, possibleTypes.size());
assertTrue(possibleTypes.contains(RecordFieldType.STRING.getDataType()));
-
assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+
assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.LONG.getDataType(),
+ RecordFieldType.STRING.getDataType()
+ ))));
assertSame(RecordFieldType.STRING,
schema.getDataType("nullValue").get().getFieldType());
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
new file mode 100644
index 0000000..caf0038
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.inference;
+
+import com.google.common.collect.Sets;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import static com.google.common.collect.Collections2.permutations;
+import static org.junit.Assert.assertEquals;
+
+public class TestFieldTypeInference {
+ private FieldTypeInference testSubject;
+
+ @Before
+ public void setUp() throws Exception {
+ testSubject = new FieldTypeInference();
+ }
+
+ @Test
+ public void testToDataTypeWith_SHORT_INT_LONG_shouldReturn_LONG() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.SHORT.getDataType(),
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.LONG.getDataType()
+ );
+
+ DataType expected = RecordFieldType.LONG.getDataType();
+
+ // WHEN
+ // THEN
+ runWithAllPermutations(this::testToDataTypeShouldReturnSingleType,
dataTypes, expected);
+ }
+
+ @Test
+ public void testToDataTypeWith_INT_FLOAT_ShouldReturn_INT_FLOAT() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.FLOAT.getDataType()
+ );
+
+ Set<DataType> expected = Sets.newHashSet(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.FLOAT.getDataType()
+ );
+
+ // WHEN
+ // THEN
+ runWithAllPermutations(this::testToDataTypeShouldReturnChoice,
dataTypes, expected);
+ }
+
+ @Test
+ public void testToDataTypeWith_INT_STRING_shouldReturn_INT_STRING() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.STRING.getDataType()
+ );
+
+
+ Set<DataType> expected = Sets.newHashSet(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.STRING.getDataType()
+ );
+
+ // WHEN
+ // THEN
+ runWithAllPermutations(this::testToDataTypeShouldReturnChoice,
dataTypes, expected);
+ }
+
+ @Test
+ public void
testToDataTypeWith_INT_FLOAT_STRING_shouldReturn_INT_FLOAT_STRING() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.FLOAT.getDataType(),
+ RecordFieldType.STRING.getDataType()
+ );
+
+ Set<DataType> expected = Sets.newHashSet(
+ RecordFieldType.INT.getDataType(),
+ RecordFieldType.FLOAT.getDataType(),
+ RecordFieldType.STRING.getDataType()
+ );
+
+ // WHEN
+ // THEN
+ runWithAllPermutations(this::testToDataTypeShouldReturnChoice,
dataTypes, expected);
+ }
+
+ @Test
+ public void testToDataTypeWithMultipleRecord() {
+ // GIVEN
+ String fieldName = "fieldName";
+ DataType fieldType1 = RecordFieldType.INT.getDataType();
+ DataType fieldType2 = RecordFieldType.FLOAT.getDataType();
+ DataType fieldType3 = RecordFieldType.STRING.getDataType();
+
+ List<DataType> dataTypes = Arrays.asList(
+
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName,
fieldType1)),
+
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName,
fieldType2)),
+
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName,
fieldType3)),
+
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName,
fieldType2))
+ );
+
+ DataType expected =
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(
+ fieldName,
+ RecordFieldType.CHOICE.getChoiceDataType(
+ fieldType1,
+ fieldType2,
+ fieldType3
+ )
+ ));
+
+ // WHEN
+ // THEN
+ runWithAllPermutations(this::testToDataTypeShouldReturnSingleType,
dataTypes, expected);
+ }
+
+ private SimpleRecordSchema createRecordSchema(String fieldName, DataType
fieldType) {
+ return new SimpleRecordSchema(Arrays.asList(
+ new RecordField(fieldName, fieldType)
+ ));
+ }
+
+ private <I, E> void runWithAllPermutations(BiFunction<List<I>, E, ?> test,
List<I> input, E expected) {
+ permutations(input).forEach(inputPermutation ->
test.apply(inputPermutation, expected));
+ }
+
+ private Void testToDataTypeShouldReturnChoice(List<DataType> dataTypes,
Set<DataType> expected) {
+ // GIVEN
+ dataTypes.forEach(testSubject::addPossibleDataType);
+
+ // WHEN
+ DataType actual = testSubject.toDataType();
+
+ // THEN
+ assertEquals(expected, new HashSet<>(((ChoiceDataType)
actual).getPossibleSubTypes()));
+
+ return null;
+ }
+
+ private Void testToDataTypeShouldReturnSingleType(List<DataType>
dataTypes, DataType expected) {
+ // GIVEN
+ dataTypes.forEach(testSubject::addPossibleDataType);
+
+ // WHEN
+ DataType actual = testSubject.toDataType();
+
+ // THEN
+ assertEquals(expected, actual);
+
+ return null;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
index 75d6988..56ae4e6 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
@@ -85,7 +85,13 @@ public class TestInferXmlSchema {
assertSame(RecordFieldType.STRING,
schema.getDataType("COUNTRY").get().getFieldType());
assertEquals(RecordFieldType.DATE.getDataType(timeValueInference.getDateFormat()),
schema.getDataType("DOB").get());
- assertEquals(RecordFieldType.STRING.getDataType(),
schema.getDataType("TOB").get());
+ assertEquals(
+ RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.TIME.getDataType("HH:mm:ss"),
+ RecordFieldType.STRING.getDataType()
+ ),
+ schema.getDataType("TOB").get()
+ );
assertEquals(RecordFieldType.TIMESTAMP.getDataType(timeValueInference.getTimestampFormat()),
schema.getDataType("TSOB").get());
final DataType addressDataType = schema.getDataType("ADDRESS").get();
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
index b4c73f8..0472512 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
@@ -7,9 +7,9 @@
"bigint" : 8,
"float" : 8.0,
"double" : 8.0,
+ "timestamp" : "2017-01-01 17:00:00",
"date" : "2017-01-01",
"time" : "17:00:00",
- "timestamp" : "2017-01-01 17:00:00",
"char" : "c",
"string" : "string",
"record" : null,