Repository: nifi Updated Branches: refs/heads/master c86190c51 -> 45e035686
NIFI-4029: Allow null Avro default values in HortonworksSchemaRegistry This closes #1894. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/45e03568 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/45e03568 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/45e03568 Branch: refs/heads/master Commit: 45e035686f0a51c29629f6f6dfa1d26496e21997 Parents: c86190c Author: Steve Champagne <[email protected]> Authored: Wed Jun 7 12:36:28 2017 +0000 Committer: Bryan Bende <[email protected]> Committed: Wed Jun 7 12:03:53 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/avro/AvroTypeUtil.java | 6 +- .../nifi-hwx-schema-registry-service/pom.xml | 4 + .../hortonworks/HortonworksSchemaRegistry.java | 122 +------------------ 3 files changed, 13 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/45e03568/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 52c55fc..1417e67 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -152,7 +152,11 @@ public class AvroTypeUtil { final Schema fieldSchema = field.schema(); final DataType fieldType = determineDataType(fieldSchema); - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); + if (field.defaultVal() == JsonProperties.NULL_VALUE) { + recordFields.add(new RecordField(fieldName, fieldType, field.aliases())); + } else { + recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); + } } final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); http://git-wip-us.apache.org/repos/asf/nifi/blob/45e03568/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml index 79dbc84..38e175c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml @@ -43,6 +43,10 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-avro-record-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-schema-registry-service-api</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/45e03568/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index d2289a2..3027e5f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -25,12 +25,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.avro.LogicalType; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; @@ -42,10 +39,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.util.Tuple; @@ -261,7 +254,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { final Schema schema = new Schema.Parser().parse(schemaText); - return createRecordSchema(schema, schemaText, schemaIdentifier); + return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier); }); } @@ -309,120 +302,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { final Schema schema = new Schema.Parser().parse(schemaText); - return createRecordSchema(schema, schemaText, schemaIdentifier); + return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier); }); } - /** - * Converts an Avro Schema to a RecordSchema - * - * @param avroSchema the Avro Schema to convert - * @param text the textual representation of the schema - * @param schemaId the id of the schema - * @return the Corresponding Record Schema - */ - private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final SchemaIdentifier schemaId) { - final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); - for (final Field field : avroSchema.getFields()) { - final String fieldName = field.name(); - final DataType dataType = determineDataType(field.schema()); - - recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", schemaId); - return recordSchema; - } - - /** - * Returns a DataType for the given Avro Schema - * - * @param avroSchema the Avro Schema to convert - * @return a Data Type that corresponds to the given Avro Schema - */ - private DataType determineDataType(final Schema avroSchema) { - final Type avroType = avroSchema.getType(); - - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType != null) { - final String logicalTypeName = logicalType.getName(); - switch (logicalTypeName) { - case LOGICAL_TYPE_DATE: - return RecordFieldType.DATE.getDataType(); - case LOGICAL_TYPE_TIME_MILLIS: - case LOGICAL_TYPE_TIME_MICROS: - return RecordFieldType.TIME.getDataType(); - case LOGICAL_TYPE_TIMESTAMP_MILLIS: - case LOGICAL_TYPE_TIMESTAMP_MICROS: - return RecordFieldType.TIMESTAMP.getDataType(); - } - } - - switch (avroType) { - case ARRAY: - return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType())); - case BYTES: - case FIXED: - return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); - case BOOLEAN: - return RecordFieldType.BOOLEAN.getDataType(); - case DOUBLE: - return RecordFieldType.DOUBLE.getDataType(); - case ENUM: - case STRING: - return RecordFieldType.STRING.getDataType(); - case FLOAT: - return RecordFieldType.FLOAT.getDataType(); - case INT: - return RecordFieldType.INT.getDataType(); - case LONG: - return RecordFieldType.LONG.getDataType(); - case RECORD: { - final List<Field> avroFields = avroSchema.getFields(); - final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); - - for (final Field field : avroFields) { - final String fieldName = field.name(); - final Schema fieldSchema = field.schema(); - final DataType fieldType = determineDataType(fieldSchema); - recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY); - return RecordFieldType.RECORD.getRecordDataType(recordSchema); - } - case NULL: - return RecordFieldType.STRING.getDataType(); - case MAP: - final Schema valueSchema = avroSchema.getValueType(); - final DataType valueType = determineDataType(valueSchema); - return RecordFieldType.MAP.getMapDataType(valueType); - case UNION: { - final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); - - if (nonNullSubSchemas.size() == 1) { - return determineDataType(nonNullSubSchemas.get(0)); - } - - final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); - for (final Schema subSchema : nonNullSubSchemas) { - final DataType childDataType = determineDataType(subSchema); - possibleChildTypes.add(childDataType); - } - - return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); - } - } - - return null; - } - - @Override public Set<SchemaField> getSuppliedSchemaFields() { return schemaFields; } -} \ No newline at end of file +}
