Repository: nifi Updated Branches: refs/heads/master f019d509f -> 40a9cd4f2
NIFI-3919: Let AvroTypeUtil try every possible type Before this fix, AvroTypeUtil can throw an Exception before trying every possible data types defined within a union field. This closes #1816. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40a9cd4f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40a9cd4f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40a9cd4f Branch: refs/heads/master Commit: 40a9cd4f2ead4d0388581921b0de9108a6e97946 Parents: f019d50 Author: Koji Kawamura <ijokaruma...@apache.org> Authored: Wed May 17 15:34:01 2017 +0900 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed May 17 10:53:27 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/avro/AvroTypeUtil.java | 20 ++++++--- .../nifi-record-serialization-services/pom.xml | 1 + .../avro/TestAvroReaderWithEmbeddedSchema.java | 43 ++++++++++++++++++++ .../src/test/resources/avro/multiple-types.avsc | 16 ++++++++ 4 files changed, 75 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 6494fe5..daf4031 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -39,6 +39,8 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigDecimal; @@ -58,6 +60,7 @@ import java.util.function.Function; import java.util.stream.Collectors; public class AvroTypeUtil { + private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class); public static final String AVRO_SCHEMA_FORMAT = "avro"; private static final String LOGICAL_TYPE_DATE = "date"; @@ -459,12 +462,19 @@ public class AvroTypeUtil { // If at least one non-null type exists, find the first compatible type if (nonNullFieldSchemas.size() >= 1) { for (final Schema nonNullFieldSchema : nonNullFieldSchemas) { - final Object convertedValue = conversion.apply(nonNullFieldSchema); final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); - if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType) - // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue - || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) { - return convertedValue; + try { + final Object convertedValue = conversion.apply(nonNullFieldSchema); + if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType) + // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue + || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) { + return convertedValue; + } + } catch (Exception e) { + // If failed with one of possible types, continue with the next available option. + if (logger.isDebugEnabled()) { + logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 5dc1160..388d52f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -95,6 +95,7 @@ <exclude>src/test/resources/avro/datatypes.avsc</exclude> <exclude>src/test/resources/avro/logical-types.avsc</exclude> <exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude> + <exclude>src/test/resources/avro/multiple-types.avsc</exclude> <exclude>src/test/resources/csv/extra-white-space.csv</exclude> <exclude>src/test/resources/csv/multi-bank-account.csv</exclude> <exclude>src/test/resources/csv/single-bank-account.csv</exclude> http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java index 6782d33..5c04cfa 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java @@ -290,6 +290,49 @@ public class TestAvroReaderWithEmbeddedSchema { } } + @Test + public void testMultipleTypes() throws IOException, ParseException, MalformedRecordException, SchemaNotFoundException { + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/multiple-types.avsc")); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] serialized; + final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter); + final DataFileWriter<GenericRecord> writer = dataFileWriter.create(schema, baos)) { + + // If a union field has multiple type options, a value should be mapped to the first compatible type. + final GenericRecord r1 = new GenericData.Record(schema); + r1.put("field", 123); + + final GenericRecord r2 = new GenericData.Record(schema); + r2.put("field", Arrays.asList(1, 2, 3)); + + final GenericRecord r3 = new GenericData.Record(schema); + r3.put("field", "not a number"); + + writer.append(r1); + writer.append(r2); + writer.append(r3); + writer.flush(); + + serialized = baos.toByteArray(); + } + + try (final InputStream in = new ByteArrayInputStream(serialized)) { + final AvroRecordReader reader = new AvroReaderWithEmbeddedSchema(in); + final RecordSchema recordSchema = reader.getSchema(); + + assertEquals(RecordFieldType.CHOICE, recordSchema.getDataType("field").get().getFieldType()); + + Record record = reader.nextRecord(); + assertEquals(123, record.getValue("field")); + record = reader.nextRecord(); + assertArrayEquals(new Object[]{1, 2, 3}, (Object[]) record.getValue("field")); + record = reader.nextRecord(); + assertEquals("not a number", record.getValue("field")); + } + } + private Object[] toObjectArray(final byte[] bytes) { final Object[] array = new Object[bytes.length]; for (int i = 0; i < bytes.length; i++) { http://git-wip-us.apache.org/repos/asf/nifi/blob/40a9cd4f/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc new file mode 100644 index 0000000..670555e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/multiple-types.avsc @@ -0,0 +1,16 @@ +{ + "namespace": "nifi", + "name": "data_types", + "type": "record", + "fields": [ + { + "name": "field", + "type": [ + "null", + "int", + {"type": "array", "items": "int"}, + "string" + ] + } + ] +} \ No newline at end of file