Repository: nifi
Updated Branches:
  refs/heads/master b6bdc4a0a -> 72de1cbde


NIFI-3861: Fixed AvroReader nullable logical types issue

- AvroReader did not convert logical types if those are defined with union
- Consolidated createSchema method in AvroSchemaRegistry and AvroTypeUtil as 
both has identical implementation and mai
ntaining both would be error-prone

This closes #1779.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72de1cbd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72de1cbd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72de1cbd

Branch: refs/heads/master
Commit: 72de1cbdef05b0b61ed78871b15f2d06711760a1
Parents: b6bdc4a
Author: Koji Kawamura <[email protected]>
Authored: Wed May 10 22:13:06 2017 +0900
Committer: Mark Payne <[email protected]>
Committed: Wed May 10 10:08:22 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 194 ++++++++++++-------
 .../nifi-registry-service/pom.xml               |   4 +-
 .../services/AvroSchemaRegistry.java            | 126 +-----------
 .../nifi-record-serialization-services/pom.xml  |   1 +
 .../avro/TestAvroReaderWithEmbeddedSchema.java  |  11 +-
 .../resources/avro/logical-types-nullable.avsc  |  69 +++++++
 6 files changed, 212 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/72de1cbd/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index bfdba3d..38e87a8 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.avro;
 
 import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -43,17 +42,25 @@ import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class AvroTypeUtil {
     public static final String AVRO_SCHEMA_FORMAT = "avro";
 
+    private static final String LOGICAL_TYPE_DATE = "date";
+    private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
+    private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
+    private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = 
"timestamp-millis";
+    private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = 
"timestamp-micros";
+
     public static Schema extractAvroSchema(final RecordSchema recordSchema) 
throws SchemaNotFoundException {
         if (recordSchema == null) {
             throw new IllegalArgumentException("RecordSchema cannot be null");
@@ -78,16 +85,36 @@ public class AvroTypeUtil {
         return new Schema.Parser().parse(text);
     }
 
+    /**
+     * Returns a DataType for the given Avro Schema
+     *
+     * @param avroSchema the Avro Schema to convert
+     * @return a Data Type that corresponds to the given Avro Schema
+     */
     public static DataType determineDataType(final Schema avroSchema) {
         final Type avroType = avroSchema.getType();
 
+        final LogicalType logicalType = avroSchema.getLogicalType();
+        if (logicalType != null) {
+            final String logicalTypeName = logicalType.getName();
+            switch (logicalTypeName) {
+                case LOGICAL_TYPE_DATE:
+                    return RecordFieldType.DATE.getDataType();
+                case LOGICAL_TYPE_TIME_MILLIS:
+                case LOGICAL_TYPE_TIME_MICROS:
+                    return RecordFieldType.TIME.getDataType();
+                case LOGICAL_TYPE_TIMESTAMP_MILLIS:
+                case LOGICAL_TYPE_TIMESTAMP_MICROS:
+                    return RecordFieldType.TIMESTAMP.getDataType();
+            }
+        }
+
         switch (avroType) {
+            case ARRAY:
+                return 
RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
             case BYTES:
             case FIXED:
                 return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
-            case ARRAY:
-                final DataType elementType = 
determineDataType(avroSchema.getElementType());
-                return RecordFieldType.ARRAY.getArrayDataType(elementType);
             case BOOLEAN:
                 return RecordFieldType.BOOLEAN.getDataType();
             case DOUBLE:
@@ -97,36 +124,10 @@ public class AvroTypeUtil {
                 return RecordFieldType.STRING.getDataType();
             case FLOAT:
                 return RecordFieldType.FLOAT.getDataType();
-            case INT: {
-                final LogicalType logicalType = avroSchema.getLogicalType();
-                if (logicalType == null) {
-                    return RecordFieldType.INT.getDataType();
-                }
-
-                if 
(LogicalTypes.date().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.DATE.getDataType();
-                } else if 
(LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIME.getDataType();
-                }
-
+            case INT:
                 return RecordFieldType.INT.getDataType();
-            }
-            case LONG: {
-                final LogicalType logicalType = avroSchema.getLogicalType();
-                if (logicalType == null) {
-                    return RecordFieldType.LONG.getDataType();
-                }
-
-                if 
(LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIMESTAMP.getDataType();
-                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIMESTAMP.getDataType();
-                } else if 
(LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIME.getDataType();
-                }
-
+            case LONG:
                 return RecordFieldType.LONG.getDataType();
-            }
             case RECORD: {
                 final List<Field> avroFields = avroSchema.getFields();
                 final List<RecordField> recordFields = new 
ArrayList<>(avroFields.size());
@@ -135,6 +136,7 @@ public class AvroTypeUtil {
                     final String fieldName = field.name();
                     final Schema fieldSchema = field.schema();
                     final DataType fieldType = determineDataType(fieldSchema);
+
                     recordFields.add(new RecordField(fieldName, fieldType, 
field.defaultVal(), field.aliases()));
                 }
 
@@ -148,9 +150,7 @@ public class AvroTypeUtil {
                 final DataType valueType = determineDataType(valueSchema);
                 return RecordFieldType.MAP.getMapDataType(valueType);
             case UNION: {
-                final List<Schema> nonNullSubSchemas = 
avroSchema.getTypes().stream()
-                    .filter(s -> s.getType() != Type.NULL)
-                    .collect(Collectors.toList());
+                final List<Schema> nonNullSubSchemas = 
getNonNullSubSchemas(avroSchema);
 
                 if (nonNullSubSchemas.size() == 1) {
                     return determineDataType(nonNullSubSchemas.get(0));
@@ -169,11 +169,37 @@ public class AvroTypeUtil {
         return null;
     }
 
+    private static List<Schema> getNonNullSubSchemas(Schema avroSchema) {
+        List<Schema> unionFieldSchemas = avroSchema.getTypes();
+        if (unionFieldSchemas == null) {
+            return Collections.emptyList();
+        }
+        return unionFieldSchemas.stream()
+                .filter(s -> s.getType() != Type.NULL)
+                .collect(Collectors.toList());
+    }
+
     public static RecordSchema createSchema(final Schema avroSchema) {
         if (avroSchema == null) {
             throw new IllegalArgumentException("Avro Schema cannot be null");
         }
 
+        return createSchema(avroSchema, avroSchema.toString(), 
SchemaIdentifier.EMPTY);
+    }
+
+    /**
+     * Converts an Avro Schema to a RecordSchema
+     *
+     * @param avroSchema the Avro Schema to convert
+     * @param schemaText the textual representation of the schema
+     * @param schemaId the identifier of the schema
+     * @return the Corresponding Record Schema
+     */
+    public static RecordSchema createSchema(final Schema avroSchema, final 
String schemaText, final SchemaIdentifier schemaId) {
+        if (avroSchema == null) {
+            throw new IllegalArgumentException("Avro Schema cannot be null");
+        }
+
         final List<RecordField> recordFields = new 
ArrayList<>(avroSchema.getFields().size());
         for (final Field field : avroSchema.getFields()) {
             final String fieldName = field.name();
@@ -182,7 +208,7 @@ public class AvroTypeUtil {
             recordFields.add(new RecordField(fieldName, dataType, 
field.defaultVal(), field.aliases()));
         }
 
-        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, 
avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
+        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, 
schemaText, AVRO_SCHEMA_FORMAT, schemaId);
         return recordSchema;
     }
 
@@ -227,7 +253,11 @@ public class AvroTypeUtil {
         return rec;
     }
 
-    private static Object convertToAvroObject(final Object rawValue, final 
Schema fieldSchema, final String fieldName) throws IOException {
+    /**
+     * Convert a raw value to an Avro object to serialize in Avro type system.
+     * The counter-part method which reads an Avro object back to a raw value 
is {@link #normalizeValue(Object, Schema)}.
+     */
+    private static Object convertToAvroObject(final Object rawValue, final 
Schema fieldSchema, final String fieldName) {
         if (rawValue == null) {
             return null;
         }
@@ -239,13 +269,13 @@ public class AvroTypeUtil {
                     return DataTypeUtils.toInteger(rawValue, fieldName);
                 }
 
-                if 
(LogicalTypes.date().getName().equals(logicalType.getName())) {
+                if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
                     final long longValue = DataTypeUtils.toLong(rawValue, 
fieldName);
                     final Date date = new Date(longValue);
                     final Duration duration = Duration.between(new 
Date(0L).toInstant(), date.toInstant());
                     final long days = duration.toDays();
                     return (int) days;
-                } else if 
(LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
+                } else if 
(LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
                     final long longValue = DataTypeUtils.toLong(rawValue, 
fieldName);
                     final Date date = new Date(longValue);
                     final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
@@ -261,14 +291,14 @@ public class AvroTypeUtil {
                     return DataTypeUtils.toLong(rawValue, fieldName);
                 }
 
-                if 
(LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
+                if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) {
                     final long longValue = DataTypeUtils.toLong(rawValue, 
fieldName);
                     final Date date = new Date(longValue);
                     final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
                     return duration.toMillis() * 1000L;
-                } else if 
(LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
+                } else if 
(LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
                     return DataTypeUtils.toLong(rawValue, fieldName);
-                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
+                } else if 
(LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
                     return DataTypeUtils.toLong(rawValue, fieldName) * 1000L;
                 }
 
@@ -319,28 +349,25 @@ public class AvroTypeUtil {
                 }
                 return avroRecord;
             case UNION:
-                List<Schema> unionFieldSchemas = fieldSchema.getTypes();
-                if (unionFieldSchemas != null) {
-                    // Ignore null types in union
-                    final List<Schema> nonNullFieldSchemas = 
unionFieldSchemas.stream()
-                            .filter(s -> s.getType() != Type.NULL)
-                            .collect(Collectors.toList());
-
-                    // If at least one non-null type exists, find the first 
compatible type
-                    if (nonNullFieldSchemas.size() >= 1) {
-                        for (final Schema nonNullFieldSchema : 
nonNullFieldSchemas) {
-                            final Object avroObject = 
convertToAvroObject(rawValue, nonNullFieldSchema, fieldName);
-                            final DataType desiredDataType = 
AvroTypeUtil.determineDataType(nonNullFieldSchema);
-                            if (DataTypeUtils.isCompatibleDataType(avroObject, 
desiredDataType)) {
-                                return avroObject;
-                            }
+                // Ignore null types in union
+                final List<Schema> nonNullFieldSchemas = 
getNonNullSubSchemas(fieldSchema);
+
+                // If at least one non-null type exists, find the first 
compatible type
+                if (nonNullFieldSchemas.size() >= 1) {
+                    for (final Schema nonNullFieldSchema : 
nonNullFieldSchemas) {
+                        final Object avroObject = 
convertToAvroObject(rawValue, nonNullFieldSchema, fieldName);
+                        final DataType desiredDataType = 
AvroTypeUtil.determineDataType(nonNullFieldSchema);
+                        if (DataTypeUtils.isCompatibleDataType(avroObject, 
desiredDataType)
+                                // For logical types those store with 
different type (e.g. BigDecimal as ByteBuffer), check compatibility using the 
original rawValue
+                                || (nonNullFieldSchema.getLogicalType() != 
null && DataTypeUtils.isCompatibleDataType(rawValue, desiredDataType))) {
+                            return avroObject;
                         }
-
-                        throw new IllegalTypeConversionException("Cannot 
convert value " + rawValue + " of type " + rawValue.getClass()
-                                + " because no compatible types exist in the 
UNION");
                     }
+
+                    throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass()
+                            + " because no compatible types exist in the 
UNION");
                 }
-                return null;
+                return convertUnionFieldValue(rawValue, fieldSchema, schema -> 
convertToAvroObject(rawValue, schema, fieldName));
             case ARRAY:
                 final Object[] objectArray = (Object[]) rawValue;
                 final List<Object> list = new ArrayList<>(objectArray.length);
@@ -399,6 +426,39 @@ public class AvroTypeUtil {
         return values;
     }
 
+    /**
+     * Convert value of a nullable union field.
+     * @param originalValue original value
+     * @param fieldSchema the union field schema
+     * @param conversion the conversion function which takes a non-null field 
schema within the union field and returns a converted value
+     * @return a converted value
+     */
+    private static Object convertUnionFieldValue(Object originalValue, Schema 
fieldSchema, Function<Schema, Object> conversion) {
+        // Ignore null types in union
+        final List<Schema> nonNullFieldSchemas = 
getNonNullSubSchemas(fieldSchema);
+
+        // If at least one non-null type exists, find the first compatible type
+        if (nonNullFieldSchemas.size() >= 1) {
+            for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
+                final Object convertedValue = 
conversion.apply(nonNullFieldSchema);
+                final DataType desiredDataType = 
AvroTypeUtil.determineDataType(nonNullFieldSchema);
+                if (DataTypeUtils.isCompatibleDataType(convertedValue, 
desiredDataType)
+                        // For logical types those store with different type 
(e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
+                        || (nonNullFieldSchema.getLogicalType() != null && 
DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
+                    return convertedValue;
+                }
+            }
+
+            throw new IllegalTypeConversionException("Cannot convert value " + 
originalValue + " of type " + originalValue.getClass()
+                    + " because no compatible types exist in the UNION");
+        }
+        return null;
+    }
+
+    /**
+     * Convert an Avro object to a normal Java objects for further processing.
+     * The counter-part method which convert a raw value to an Avro object is 
{@link #convertToAvroObject(Object, Schema, String)}
+     */
     private static Object normalizeValue(final Object value, final Schema 
avroSchema) {
         if (value == null) {
             return null;
@@ -412,10 +472,10 @@ public class AvroTypeUtil {
                 }
 
                 final String logicalName = logicalType.getName();
-                if (LogicalTypes.date().getName().equals(logicalName)) {
+                if (LOGICAL_TYPE_DATE.equals(logicalName)) {
                     // date logical name means that the value is number of 
days since Jan 1, 1970
                     return new java.sql.Date(TimeUnit.DAYS.toMillis((int) 
value));
-                } else if (LogicalTypes.timeMillis().equals(logicalName)) {
+                } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
                     // time-millis logical name means that the value is number 
of milliseconds since midnight.
                     return new java.sql.Time((int) value);
                 }
@@ -429,11 +489,11 @@ public class AvroTypeUtil {
                 }
 
                 final String logicalName = logicalType.getName();
-                if (LogicalTypes.timeMicros().getName().equals(logicalName)) {
+                if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) {
                     return new 
java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
-                } else if 
(LogicalTypes.timestampMillis().getName().equals(logicalName)) {
+                } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
                     return new java.sql.Timestamp((long) value);
-                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalName)) {
+                } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) {
                     return new 
java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
                 }
                 break;
@@ -443,7 +503,7 @@ public class AvroTypeUtil {
                     final GenericData.Record avroRecord = (GenericData.Record) 
value;
                     return normalizeValue(value, avroRecord.getSchema());
                 }
-                break;
+                return convertUnionFieldValue(value, avroSchema, schema -> 
normalizeValue(value, schema));
             case RECORD:
                 final GenericData.Record record = (GenericData.Record) value;
                 final Schema recordSchema = record.getSchema();

http://git-wip-us.apache.org/repos/asf/nifi/blob/72de1cbd/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
index 6aceaa0..54dc090 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
@@ -35,8 +35,8 @@
                        <artifactId>nifi-record</artifactId>
                </dependency>
                <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-avro-record-utils</artifactId>
                </dependency>
                <dependency>
                        <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/72de1cbd/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
index f48d0f5..169d79d 100644
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
@@ -17,34 +17,26 @@
 package org.apache.nifi.schemaregistry.services;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
-import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 
@@ -57,12 +49,6 @@ public class AvroSchemaRegistry extends 
AbstractControllerService implements Sch
     private final Map<String, String> schemaNameToSchemaMap;
     private final ConcurrentMap<String, RecordSchema> recordSchemas = new 
ConcurrentHashMap<>();
 
-    private static final String LOGICAL_TYPE_DATE = "date";
-    private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
-    private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
-    private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = 
"timestamp-millis";
-    private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = 
"timestamp-micros";
-
     public AvroSchemaRegistry() {
         this.schemaNameToSchemaMap = new HashMap<>();
     }
@@ -74,7 +60,8 @@ public class AvroSchemaRegistry extends 
AbstractControllerService implements Sch
         } else {
             try {
                 final Schema avroSchema = new Schema.Parser().parse(newValue);
-                final RecordSchema recordSchema = 
createRecordSchema(avroSchema, newValue, descriptor.getName());
+                final SchemaIdentifier schemaId = 
SchemaIdentifier.ofName(descriptor.getName());
+                final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema, newValue, schemaId);
                 recordSchemas.put(descriptor.getName(), recordSchema);
             } catch (final Exception e) {
                 // not a problem - the service won't be valid and the 
validation message will indicate what is wrong.
@@ -137,113 +124,6 @@ public class AvroSchemaRegistry extends 
AbstractControllerService implements Sch
     }
 
 
-    /**
-     * Converts an Avro Schema to a RecordSchema
-     *
-     * @param avroSchema the Avro Schema to convert
-     * @param text the textual representation of the schema
-     * @param schemaName the name of the schema
-     * @return the Corresponding Record Schema
-     */
-    private RecordSchema createRecordSchema(final Schema avroSchema, final 
String text, final String schemaName) {
-        final List<RecordField> recordFields = new 
ArrayList<>(avroSchema.getFields().size());
-        for (final Field field : avroSchema.getFields()) {
-            final String fieldName = field.name();
-            final DataType dataType = determineDataType(field.schema());
-
-            recordFields.add(new RecordField(fieldName, dataType, 
field.defaultVal(), field.aliases()));
-        }
-
-        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, 
text, "avro", SchemaIdentifier.ofName(schemaName));
-        return recordSchema;
-    }
-
-    /**
-     * Returns a DataType for the given Avro Schema
-     *
-     * @param avroSchema the Avro Schema to convert
-     * @return a Data Type that corresponds to the given Avro Schema
-     */
-    private DataType determineDataType(final Schema avroSchema) {
-        final Type avroType = avroSchema.getType();
-
-        final LogicalType logicalType = avroSchema.getLogicalType();
-        if (logicalType != null) {
-            final String logicalTypeName = logicalType.getName();
-            switch (logicalTypeName) {
-                case LOGICAL_TYPE_DATE:
-                    return RecordFieldType.DATE.getDataType();
-                case LOGICAL_TYPE_TIME_MILLIS:
-                case LOGICAL_TYPE_TIME_MICROS:
-                    return RecordFieldType.TIME.getDataType();
-                case LOGICAL_TYPE_TIMESTAMP_MILLIS:
-                case LOGICAL_TYPE_TIMESTAMP_MICROS:
-                    return RecordFieldType.TIMESTAMP.getDataType();
-            }
-        }
-
-        switch (avroType) {
-            case ARRAY:
-                return 
RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
-            case BYTES:
-            case FIXED:
-                return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
-            case BOOLEAN:
-                return RecordFieldType.BOOLEAN.getDataType();
-            case DOUBLE:
-                return RecordFieldType.DOUBLE.getDataType();
-            case ENUM:
-            case STRING:
-                return RecordFieldType.STRING.getDataType();
-            case FLOAT:
-                return RecordFieldType.FLOAT.getDataType();
-            case INT:
-                return RecordFieldType.INT.getDataType();
-            case LONG:
-                return RecordFieldType.LONG.getDataType();
-            case RECORD: {
-                final List<Field> avroFields = avroSchema.getFields();
-                final List<RecordField> recordFields = new 
ArrayList<>(avroFields.size());
-
-                for (final Field field : avroFields) {
-                    final String fieldName = field.name();
-                    final Schema fieldSchema = field.schema();
-                    final DataType fieldType = determineDataType(fieldSchema);
-
-                    recordFields.add(new RecordField(fieldName, fieldType, 
field.defaultVal(), field.aliases()));
-                }
-
-                final RecordSchema recordSchema = new 
SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", 
SchemaIdentifier.EMPTY);
-                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
-            }
-            case NULL:
-                return RecordFieldType.STRING.getDataType();
-            case MAP:
-                final Schema valueSchema = avroSchema.getValueType();
-                final DataType valueType = determineDataType(valueSchema);
-                return RecordFieldType.MAP.getMapDataType(valueType);
-            case UNION: {
-                final List<Schema> nonNullSubSchemas = 
avroSchema.getTypes().stream()
-                    .filter(s -> s.getType() != Type.NULL)
-                    .collect(Collectors.toList());
-
-                if (nonNullSubSchemas.size() == 1) {
-                    return determineDataType(nonNullSubSchemas.get(0));
-                }
-
-                final List<DataType> possibleChildTypes = new 
ArrayList<>(nonNullSubSchemas.size());
-                for (final Schema subSchema : nonNullSubSchemas) {
-                    final DataType childDataType = 
determineDataType(subSchema);
-                    possibleChildTypes.add(childDataType);
-                }
-
-                return 
RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
-            }
-        }
-
-        return null;
-    }
-
     @Override
     public Set<SchemaField> getSuppliedSchemaFields() {
         return schemaFields;

http://git-wip-us.apache.org/repos/asf/nifi/blob/72de1cbd/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 1fdcaf7..5dc1160 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -94,6 +94,7 @@
                     <excludes combine.children="append">
                         
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
                         
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
+                        
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
                         
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
                         
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
                         
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/72de1cbd/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index d315b2e..bbb62c5 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -62,7 +62,16 @@ public class TestAvroReaderWithEmbeddedSchema {
     @Test
     public void testLogicalTypes() throws IOException, ParseException, 
MalformedRecordException, SchemaNotFoundException {
         final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/avro/logical-types.avsc"));
+        testLogicalTypes(schema);
+    }
+
+    @Test
+    public void testNullableLogicalTypes() throws IOException, ParseException, 
MalformedRecordException, SchemaNotFoundException {
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/avro/logical-types-nullable.avsc"));
+        testLogicalTypes(schema);
+    }
 
+    private void testLogicalTypes(Schema schema) throws ParseException, 
IOException, MalformedRecordException {
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
         final String expectedTime = "2017-04-04 14:20:33.000";
@@ -80,7 +89,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             final DataFileWriter<GenericRecord> writer = 
dataFileWriter.create(schema, baos)) {
 
             final GenericRecord record = new GenericData.Record(schema);
-            record.put("timeMillis", millisSinceMidnight);
+            record.put("timeMillis", (int) millisSinceMidnight);
             record.put("timeMicros", millisSinceMidnight * 1000L);
             record.put("timestampMillis", timeLong);
             record.put("timestampMicros", timeLong * 1000L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/72de1cbd/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc
new file mode 100644
index 0000000..c846ee7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types-nullable.avsc
@@ -0,0 +1,69 @@
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+      "name": "timeMillis",
+      "type": [
+        "null",
+        {
+          "type": "int",
+          "logicalType": "time-millis"
+        }
+      ]
+    },
+    {
+      "name": "timeMicros",
+      "type": [
+        "null",
+        {
+          "type": "long",
+          "logicalType": "time-micros"
+        }
+      ]
+    },
+    {
+      "name": "timestampMillis",
+      "type": [
+        "null",
+        {
+          "type": "long",
+          "logicalType": "timestamp-millis"
+        }
+      ]
+    },
+    {
+      "name": "timestampMicros",
+      "type": [
+        "null",
+        {
+          "type": "long",
+          "logicalType": "timestamp-micros"
+        }
+      ]
+    },
+    {
+      "name": "date",
+      "type": [
+        "null",
+        {
+          "type": "int",
+          "logicalType": "date"
+        }
+      ]
+    },
+    {
+      "name": "decimal",
+      "type": [
+        "null",
+        {
+          "type": "bytes",
+          "logicalType": "decimal",
+          "precision": 5,
+          "scale": 2
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file

Reply via email to