http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
deleted file mode 100644
index 4449afc..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.avro;
-
-import org.apache.avro.Schema;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-
-public class AvroSchemaValidator implements Validator {
-
-    @Override
-    public ValidationResult validate(final String subject, final String input, 
final ValidationContext context) {
-        if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
-            return new ValidationResult.Builder()
-                .input(input)
-                .subject(subject)
-                .valid(true)
-                .explanation("Expression Language is present")
-                .build();
-        }
-
-        try {
-            new Schema.Parser().parse(input);
-
-            return new ValidationResult.Builder()
-                .valid(true)
-                .build();
-        } catch (final Exception e) {
-            return new ValidationResult.Builder()
-                .input(input)
-                .subject(subject)
-                .valid(false)
-                .explanation("Not a valid Avro Schema: " + e.getMessage())
-                .build();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
deleted file mode 100644
index b65026a..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.avro;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
-
-public class AvroTypeUtil {
-    public static final String AVRO_SCHEMA_FORMAT = "avro";
-
-    public static Schema extractAvroSchema(final RecordSchema recordSchema) 
throws SchemaNotFoundException {
-        final Optional<String> schemaFormatOption = 
recordSchema.getSchemaFormat();
-        if (!schemaFormatOption.isPresent()) {
-            throw new SchemaNotFoundException("No Schema Format was present in 
the RecordSchema");
-        }
-
-        final String schemaFormat = schemaFormatOption.get();
-        if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
-            throw new SchemaNotFoundException("Schema provided is not in Avro 
format");
-        }
-
-        final Optional<String> textOption = recordSchema.getSchemaText();
-        if (!textOption.isPresent()) {
-            throw new SchemaNotFoundException("No Schema text was present in 
the RecordSchema");
-        }
-
-        final String text = textOption.get();
-        return new Schema.Parser().parse(text);
-    }
-
-    public static DataType determineDataType(final Schema avroSchema) {
-        final Type avroType = avroSchema.getType();
-
-        switch (avroType) {
-            case BYTES:
-            case FIXED:
-                return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
-            case ARRAY:
-                final DataType elementType = 
determineDataType(avroSchema.getElementType());
-                return RecordFieldType.ARRAY.getArrayDataType(elementType);
-            case BOOLEAN:
-                return RecordFieldType.BOOLEAN.getDataType();
-            case DOUBLE:
-                return RecordFieldType.DOUBLE.getDataType();
-            case ENUM:
-            case STRING:
-                return RecordFieldType.STRING.getDataType();
-            case FLOAT:
-                return RecordFieldType.FLOAT.getDataType();
-            case INT: {
-                final LogicalType logicalType = avroSchema.getLogicalType();
-                if (logicalType == null) {
-                    return RecordFieldType.INT.getDataType();
-                }
-
-                if 
(LogicalTypes.date().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.DATE.getDataType();
-                } else if 
(LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIME.getDataType();
-                }
-
-                return RecordFieldType.INT.getDataType();
-            }
-            case LONG: {
-                final LogicalType logicalType = avroSchema.getLogicalType();
-                if (logicalType == null) {
-                    return RecordFieldType.LONG.getDataType();
-                }
-
-                if 
(LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIMESTAMP.getDataType();
-                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIMESTAMP.getDataType();
-                } else if 
(LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
-                    return RecordFieldType.TIME.getDataType();
-                }
-
-                return RecordFieldType.LONG.getDataType();
-            }
-            case RECORD: {
-                final List<Field> avroFields = avroSchema.getFields();
-                final List<RecordField> recordFields = new 
ArrayList<>(avroFields.size());
-
-                for (final Field field : avroFields) {
-                    final String fieldName = field.name();
-                    final Schema fieldSchema = field.schema();
-                    final DataType fieldType = determineDataType(fieldSchema);
-                    recordFields.add(new RecordField(fieldName, fieldType, 
field.defaultVal(), field.aliases()));
-                }
-
-                final RecordSchema recordSchema = new 
SimpleRecordSchema(recordFields, avroSchema.toString(), AVRO_SCHEMA_FORMAT, 
SchemaIdentifier.EMPTY);
-                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
-            }
-            case NULL:
-                return RecordFieldType.STRING.getDataType();
-            case MAP:
-                final Schema valueSchema = avroSchema.getValueType();
-                final DataType valueType = determineDataType(valueSchema);
-                return RecordFieldType.MAP.getMapDataType(valueType);
-            case UNION: {
-                final List<Schema> nonNullSubSchemas = 
avroSchema.getTypes().stream()
-                    .filter(s -> s.getType() != Type.NULL)
-                    .collect(Collectors.toList());
-
-                if (nonNullSubSchemas.size() == 1) {
-                    return determineDataType(nonNullSubSchemas.get(0));
-                }
-
-                final List<DataType> possibleChildTypes = new 
ArrayList<>(nonNullSubSchemas.size());
-                for (final Schema subSchema : nonNullSubSchemas) {
-                    final DataType childDataType = 
determineDataType(subSchema);
-                    possibleChildTypes.add(childDataType);
-                }
-
-                return 
RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
-            }
-        }
-
-        return null;
-    }
-
-    public static RecordSchema createSchema(final Schema avroSchema) {
-        final List<RecordField> recordFields = new 
ArrayList<>(avroSchema.getFields().size());
-        for (final Field field : avroSchema.getFields()) {
-            final String fieldName = field.name();
-            final DataType dataType = 
AvroTypeUtil.determineDataType(field.schema());
-
-            recordFields.add(new RecordField(fieldName, dataType, 
field.defaultVal(), field.aliases()));
-        }
-
-        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, 
avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
-        return recordSchema;
-    }
-
-    public static Object[] convertByteArray(final byte[] bytes) {
-        final Object[] array = new Object[bytes.length];
-        for (int i = 0; i < bytes.length; i++) {
-            array[i] = Byte.valueOf(bytes[i]);
-        }
-        return array;
-    }
-
-    public static ByteBuffer convertByteArray(final Object[] bytes) {
-        final ByteBuffer bb = ByteBuffer.allocate(bytes.length);
-        for (final Object o : bytes) {
-            if (o instanceof Byte) {
-                bb.put(((Byte) o).byteValue());
-            } else {
-                throw new IllegalTypeConversionException("Cannot convert value 
" + bytes + " of type " + bytes.getClass() + " to ByteBuffer");
-            }
-        }
-        bb.flip();
-        return bb;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
index 55f796a..c09e3d5 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
@@ -17,36 +17,18 @@
 
 package org.apache.nifi.avro;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.EnumSymbol;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
 
 public abstract class WriteAvroResult implements RecordSetWriter {
     private final Schema schema;
@@ -59,143 +41,9 @@ public abstract class WriteAvroResult implements 
RecordSetWriter {
         return schema;
     }
 
-    protected GenericRecord createAvroRecord(final Record record, final Schema 
avroSchema) throws IOException {
-        final GenericRecord rec = new GenericData.Record(avroSchema);
-        final RecordSchema recordSchema = record.getSchema();
-
-        for (final RecordField recordField : recordSchema.getFields()) {
-            final Object rawValue = record.getValue(recordField);
-            final String fieldName = recordField.getFieldName();
-
-            final Field field = avroSchema.getField(fieldName);
-            if (field == null) {
-                continue;
-            }
-
-            final Object converted = convertToAvroObject(rawValue, 
field.schema(), fieldName);
-            rec.put(fieldName, converted);
-        }
-
-        return rec;
-    }
-
-    protected Object convertToAvroObject(final Object rawValue, final Schema 
fieldSchema, final String fieldName) throws IOException {
-        if (rawValue == null) {
-            return null;
-        }
-
-        switch (fieldSchema.getType()) {
-            case INT: {
-                final LogicalType logicalType = fieldSchema.getLogicalType();
-                if (logicalType == null) {
-                    return DataTypeUtils.toInteger(rawValue, fieldName);
-                }
-
-                if 
(LogicalTypes.date().getName().equals(logicalType.getName())) {
-                    final long longValue = DataTypeUtils.toLong(rawValue, 
fieldName);
-                    final Date date = new Date(longValue);
-                    final Duration duration = Duration.between(new 
Date(0L).toInstant(), date.toInstant());
-                    final long days = duration.toDays();
-                    return (int) days;
-                } else if 
(LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
-                    final long longValue = DataTypeUtils.toLong(rawValue, 
fieldName);
-                    final Date date = new Date(longValue);
-                    final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
-                    final long millisSinceMidnight = duration.toMillis();
-                    return (int) millisSinceMidnight;
-                }
-
-                return DataTypeUtils.toInteger(rawValue, fieldName);
-            }
-            case LONG: {
-                final LogicalType logicalType = fieldSchema.getLogicalType();
-                if (logicalType == null) {
-                    return DataTypeUtils.toLong(rawValue, fieldName);
-                }
-
-                if 
(LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
-                    final long longValue = DataTypeUtils.toLong(rawValue, 
fieldName);
-                    final Date date = new Date(longValue);
-                    final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
-                    return duration.toMillis() * 1000L;
-                } else if 
(LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
-                    return DataTypeUtils.toLong(rawValue, fieldName);
-                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
-                    return DataTypeUtils.toLong(rawValue, fieldName) * 1000L;
-                }
-
-                return DataTypeUtils.toLong(rawValue, fieldName);
-            }
-            case BYTES:
-            case FIXED:
-                if (rawValue instanceof byte[]) {
-                    return ByteBuffer.wrap((byte[]) rawValue);
-                }
-                if (rawValue instanceof Object[]) {
-                    return AvroTypeUtil.convertByteArray((Object[]) rawValue);
-                } else {
-                    throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
-                }
-            case MAP:
-                if (rawValue instanceof Record) {
-                    final Record recordValue = (Record) rawValue;
-                    final Map<String, Object> map = new HashMap<>();
-                    for (final RecordField recordField : 
recordValue.getSchema().getFields()) {
-                        final Object v = recordValue.getValue(recordField);
-                        if (v != null) {
-                            map.put(recordField.getFieldName(), v);
-                        }
-                    }
-
-                    return map;
-                } else {
-                    throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
-                }
-            case RECORD:
-                final GenericData.Record avroRecord = new 
GenericData.Record(fieldSchema);
-
-                final Record record = (Record) rawValue;
-                for (final RecordField recordField : 
record.getSchema().getFields()) {
-                    final Object recordFieldValue = 
record.getValue(recordField);
-                    final String recordFieldName = recordField.getFieldName();
-
-                    final Field field = fieldSchema.getField(recordFieldName);
-                    if (field == null) {
-                        continue;
-                    }
-
-                    final Object converted = 
convertToAvroObject(recordFieldValue, field.schema(), fieldName);
-                    avroRecord.put(recordFieldName, converted);
-                }
-                return avroRecord;
-            case ARRAY:
-                final Object[] objectArray = (Object[]) rawValue;
-                final List<Object> list = new ArrayList<>(objectArray.length);
-                for (final Object o : objectArray) {
-                    final Object converted = convertToAvroObject(o, 
fieldSchema.getElementType(), fieldName);
-                    list.add(converted);
-                }
-                return list;
-            case BOOLEAN:
-                return DataTypeUtils.toBoolean(rawValue, fieldName);
-            case DOUBLE:
-                return DataTypeUtils.toDouble(rawValue, fieldName);
-            case FLOAT:
-                return DataTypeUtils.toFloat(rawValue, fieldName);
-            case NULL:
-                return null;
-            case ENUM:
-                return new EnumSymbol(fieldSchema, rawValue);
-            case STRING:
-                return DataTypeUtils.toString(rawValue, 
RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
-        }
-
-        return rawValue;
-    }
-
     @Override
     public WriteResult write(final Record record, final OutputStream out) 
throws IOException {
-        final GenericRecord rec = createAvroRecord(record, schema);
+        final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
schema);
 
         final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
         try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
@@ -206,7 +54,6 @@ public abstract class WriteAvroResult implements 
RecordSetWriter {
         return WriteResult.of(1, Collections.emptyMap());
     }
 
-
     @Override
     public String getMimeType() {
         return "application/avro-binary";

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index 74306e4..ba14b3a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -61,7 +61,7 @@ public class WriteAvroResultWithExternalSchema extends 
WriteAvroResult {
         final BinaryEncoder encoder = 
EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null);
 
         do {
-            final GenericRecord rec = createAvroRecord(record, schema);
+            final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
schema);
 
             datumWriter.write(rec, encoder);
             encoder.flush();

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
index dca8aac..d55a5dd 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
@@ -51,7 +51,7 @@ public class WriteAvroResultWithSchema extends 
WriteAvroResult {
             dataFileWriter.create(schema, outStream);
 
             do {
-                final GenericRecord rec = createAvroRecord(record, schema);
+                final GenericRecord rec = 
AvroTypeUtil.createAvroRecord(record, schema);
                 dataFileWriter.append(rec);
                 nrOfRows++;
             } while ((record = rs.next()) != null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
index 71093de..a57f10b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
@@ -41,8 +41,14 @@ import org.apache.nifi.serialization.record.RecordSchema;
 public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
     private static final Set<SchemaField> schemaFields = 
EnumSet.noneOf(SchemaField.class);
 
+    private final ConfigurationContext context;
+
+    public CSVHeaderSchemaStrategy(final ConfigurationContext context) {
+        this.context = context;
+    }
+
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream, final ConfigurationContext context) throws 
SchemaNotFoundException {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException {
         try {
             final CSVFormat csvFormat = 
CSVUtils.createCSVFormat(context).withFirstRecordAsHeader();
             try (final Reader reader = new InputStreamReader(new 
BOMInputStream(contentStream));

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index fb34f8f..9fe4136 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -51,7 +51,8 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
     private final AllowableValue headerDerivedAllowableValue = new 
AllowableValue("csv-header-derived", "Use String Fields From Header",
         "The first non-comment line of the CSV file is a header line that 
contains the names of the columns. The schema will be derived by using the "
             + "column names in the header and assuming that all columns are of 
type String.");
-    private final SchemaAccessStrategy headerDerivedSchemaStrategy = new 
CSVHeaderSchemaStrategy();
+
+    private volatile SchemaAccessStrategy headerDerivedSchemaStrategy;
 
     private volatile CSVFormat csvFormat;
     private volatile String dateFormat;
@@ -96,12 +97,12 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
allowableValue, final SchemaRegistry schemaRegistry) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext 
context) {
         if 
(allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
-            return headerDerivedSchemaStrategy;
+            return new CSVHeaderSchemaStrategy(context);
         }
 
-        return super.getSchemaAccessStrategy(allowableValue, schemaRegistry);
+        return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, 
context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
index 1048d21..13afe30 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
@@ -176,7 +176,8 @@ public class CSVUtils {
             .withAllowMissingColumnNames()
             .withIgnoreEmptyLines();
 
-        if (context.getProperty(SKIP_HEADER_LINE).asBoolean()) {
+        final PropertyValue skipHeaderPropertyValue = 
context.getProperty(SKIP_HEADER_LINE);
+        if (skipHeaderPropertyValue.getValue() != null && 
skipHeaderPropertyValue.asBoolean()) {
             format = format.withFirstRecordAsHeader();
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 778c738..6c8deab 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -178,13 +178,13 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
allowableValue, final SchemaRegistry schemaRegistry) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String 
allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext 
context) {
         if 
(allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue()))
 {
             return new SchemaAccessStrategy() {
                 private final Set<SchemaField> schemaFields = 
EnumSet.noneOf(SchemaField.class);
 
                 @Override
-                public RecordSchema getSchema(final FlowFile flowFile, final 
InputStream contentStream, final ConfigurationContext context) throws 
SchemaNotFoundException {
+                public RecordSchema getSchema(final FlowFile flowFile, final 
InputStream contentStream) throws SchemaNotFoundException {
                     return recordSchema;
                 }
 
@@ -194,7 +194,7 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
                 }
             };
         } else {
-            return super.getSchemaAccessStrategy(allowableValue, 
schemaRegistry);
+            return super.getSchemaAccessStrategy(allowableValue, 
schemaRegistry, context);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
deleted file mode 100644
index 27f84e4..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.InputStream;
-import java.util.EnumSet;
-import java.util.Set;
-
-import org.apache.avro.Schema;
-import org.apache.nifi.avro.AvroTypeUtil;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
-    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
-
-    private static final Logger logger = 
LoggerFactory.getLogger(AvroSchemaTextStrategy.class);
-    private final PropertyValue schemaTextPropertyValue;
-
-    public AvroSchemaTextStrategy(final PropertyValue schemaTextPropertyValue) 
{
-        this.schemaTextPropertyValue = schemaTextPropertyValue;
-    }
-
-    @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream, final ConfigurationContext context) throws 
SchemaNotFoundException {
-        final String schemaText = 
schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue();
-        if (schemaText == null || schemaText.trim().isEmpty()) {
-            throw new SchemaNotFoundException("FlowFile did not contain 
appropriate attributes to determine Schema Text");
-        }
-
-        logger.debug("For {} found schema text {}", flowFile, schemaText);
-
-        try {
-            final Schema avroSchema = new Schema.Parser().parse(schemaText);
-            return AvroTypeUtil.createSchema(avroSchema);
-        } catch (final Exception e) {
-            throw new SchemaNotFoundException("Failed to create schema from 
the Schema Text after evaluating FlowFile Attributes", e);
-        }
-    }
-
-    @Override
-    public Set<SchemaField> getSuppliedSchemaFields() {
-        return schemaFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
deleted file mode 100644
index 4eec14e..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public class HortonworksAttributeSchemaReferenceStrategy implements 
SchemaAccessStrategy {
-    private final Set<SchemaField> schemaFields;
-
-    public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
-    public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
-    public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = 
"schema.protocol.version";
-
-    private final SchemaRegistry schemaRegistry;
-
-
-    public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry 
schemaRegistry) {
-        this.schemaRegistry = schemaRegistry;
-
-        schemaFields = new HashSet<>();
-        schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
-        schemaFields.add(SchemaField.SCHEMA_VERSION);
-        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : 
schemaRegistry.getSuppliedSchemaFields());
-    }
-
-    @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream, final ConfigurationContext context) throws 
SchemaNotFoundException, IOException {
-        final String schemaIdentifier = 
flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE);
-        final String schemaVersion = 
flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE);
-        final String schemaProtocol = 
flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        if (schemaIdentifier == null || schemaVersion == null || 
schemaProtocol == null) {
-            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because it is missing one of the following three required 
attributes: "
-                + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " 
+ SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        }
-
-        if (!isNumber(schemaProtocol)) {
-            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a 
value of '"
-                + schemaProtocol + "', which is not a valid Protocol Version 
number");
-        }
-
-        final int protocol = Integer.parseInt(schemaProtocol);
-        if (protocol != 1) {
-            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a 
value of '"
-                + schemaProtocol + "', which is not a valid Protocol Version 
number. Expected Protocol Version to be 1.");
-        }
-
-        if (!isNumber(schemaIdentifier)) {
-            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Schema Identifier 
number");
-        }
-
-        if (!isNumber(schemaVersion)) {
-            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Schema Version 
number");
-        }
-
-        final long schemaId = Long.parseLong(schemaIdentifier);
-        final int version = Integer.parseInt(schemaVersion);
-
-        final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, 
version);
-        if (schema == null) {
-            throw new SchemaNotFoundException("Could not find a Schema in the 
Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + 
version + "'");
-        }
-
-        return schema;
-    }
-
-    private static boolean isNumber(final String value) {
-        if (value == null) {
-            return false;
-        }
-
-        final String trimmed = value.trim();
-        if (value.isEmpty()) {
-            return false;
-        }
-
-        for (int i = 0; i < trimmed.length(); i++) {
-            final char c = value.charAt(i);
-            if (c > '9' || c < '0') {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    @Override
-    public Set<SchemaField> getSuppliedSchemaFields() {
-        return schemaFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
deleted file mode 100644
index f492ec4..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
-public class HortonworksAttributeSchemaReferenceWriter implements 
SchemaAccessWriter {
-    private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    private static final int LATEST_PROTOCOL_VERSION = 1;
-
-    @Override
-    public void writeHeader(RecordSchema schema, OutputStream out) throws 
IOException {
-    }
-
-    @Override
-    public Map<String, String> getAttributes(final RecordSchema schema) {
-        final Map<String, String> attributes = new HashMap<>(4);
-        final SchemaIdentifier id = schema.getIdentifier();
-
-        final long schemaId = id.getIdentifier().getAsLong();
-        final int schemaVersion = id.getVersion().getAsInt();
-
-        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, 
String.valueOf(schemaId));
-        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE,
 String.valueOf(schemaVersion));
-        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE,
 String.valueOf(LATEST_PROTOCOL_VERSION));
-
-        return attributes;
-    }
-
-    @Override
-    public void validateSchema(final RecordSchema schema) throws 
SchemaNotFoundException {
-        final SchemaIdentifier id = schema.getIdentifier();
-        if (!id.getIdentifier().isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Reference 
as Attributes because it does not contain a Schema Identifier");
-        }
-        if (!id.getVersion().isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Reference 
as Attributes because it does not contain a Schema Version");
-        }
-    }
-
-    @Override
-    public Set<SchemaField> getRequiredSchemaFields() {
-        return requiredSchemaFields;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
deleted file mode 100644
index 081e97c..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.stream.io.StreamUtils;
-
-public class HortonworksEncodedSchemaReferenceStrategy implements 
SchemaAccessStrategy {
-    private static final int LATEST_PROTOCOL_VERSION = 1;
-
-    private final Set<SchemaField> schemaFields;
-    private final SchemaRegistry schemaRegistry;
-
-    public HortonworksEncodedSchemaReferenceStrategy(final SchemaRegistry 
schemaRegistry) {
-        this.schemaRegistry = schemaRegistry;
-
-        schemaFields = new HashSet<>();
-        schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
-        schemaFields.add(SchemaField.SCHEMA_VERSION);
-        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : 
schemaRegistry.getSuppliedSchemaFields());
-    }
-
-    @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream, final ConfigurationContext context) throws 
SchemaNotFoundException, IOException {
-        final byte[] buffer = new byte[13];
-        try {
-            StreamUtils.fillBuffer(contentStream, buffer);
-        } catch (final IOException ioe) {
-            throw new SchemaNotFoundException("Could not read first 13 bytes 
from stream", ioe);
-        }
-
-        // This encoding follows the pattern that is provided for serializing 
data by the Hortonworks Schema Registry serializer
-        // as it is provided at:
-        // 
https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
-        final ByteBuffer bb = ByteBuffer.wrap(buffer);
-        final int protocolVersion = bb.get();
-        if (protocolVersion != 1) {
-            throw new SchemaNotFoundException("Schema Encoding appears to be 
of an incompatible version. The latest known Protocol is Version "
-                + LATEST_PROTOCOL_VERSION + " but the data was encoded with 
version " + protocolVersion);
-        }
-
-        final long schemaId = bb.getLong();
-        final int schemaVersion = bb.getInt();
-
-        return schemaRegistry.retrieveSchema(schemaId, schemaVersion);
-    }
-
-    @Override
-    public Set<SchemaField> getSuppliedSchemaFields() {
-        return schemaFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
deleted file mode 100644
index bf6a9ea..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
-public class HortonworksEncodedSchemaReferenceWriter implements 
SchemaAccessWriter {
-    private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    private static final int LATEST_PROTOCOL_VERSION = 1;
-
-    @Override
-    public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
-        final SchemaIdentifier identifier = schema.getIdentifier();
-        final long id = identifier.getIdentifier().getAsLong();
-        final int version = identifier.getVersion().getAsInt();
-
-        // This decoding follows the pattern that is provided for serializing 
data by the Hortonworks Schema Registry serializer
-        // as it is provided at:
-        // 
https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
-        final ByteBuffer bb = ByteBuffer.allocate(13);
-        bb.put((byte) LATEST_PROTOCOL_VERSION);
-        bb.putLong(id);
-        bb.putInt(version);
-
-        out.write(bb.array());
-    }
-
-    @Override
-    public Map<String, String> getAttributes(final RecordSchema schema) {
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public void validateSchema(RecordSchema schema) throws 
SchemaNotFoundException {
-        final SchemaIdentifier identifier = schema.getIdentifier();
-        final OptionalLong identifierOption = identifier.getIdentifier();
-        if (!identifierOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Encoded Schema 
Reference because the Schema Identifier is not known");
-        }
-
-        final OptionalInt versionOption = identifier.getVersion();
-        if (!versionOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Encoded Schema 
Reference because the Schema Version is not known");
-        }
-    }
-
-    @Override
-    public Set<SchemaField> getRequiredSchemaFields() {
-        return requiredSchemaFields;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
deleted file mode 100644
index 6635e3d..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Set;
-
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public interface SchemaAccessStrategy {
-    /**
-     * Returns the schema for the given FlowFile using the supplied stream of 
content and configuration
-     *
-     * @param flowFile flowfile
-     * @param contentStream content of flowfile
-     * @param context configuration
-     * @return the RecordSchema for the FlowFile
-     */
-    RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, 
ConfigurationContext context) throws SchemaNotFoundException, IOException;
-
-    /**
-     * @return the set of all Schema Fields that are supplied by the 
RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream, 
ConfigurationContext)}.
-     */
-    Set<SchemaField> getSuppliedSchemaFields();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
deleted file mode 100644
index 30a995c..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public interface SchemaAccessWriter {
-
-    /**
-     * Writes the given Record Schema to the given OutputStream as header 
information, if appropriate,
-     * or returns without writing anything if the implementation does not need 
to write information to
-     * the contents of the FlowFile
-     *
-     * @param schema the schema to write
-     * @param out the OutputStream to write to
-     * @throws IOException if unable to write to the given stream
-     */
-    void writeHeader(RecordSchema schema, OutputStream out) throws IOException;
-
-    /**
-     * Returns a Map of String to String that represent the attributes that 
should be added to the FlowFile, or
-     * an empty map if no attributes should be added.
-     *
-     * @return a Map of attributes to add to the FlowFile.
-     */
-    Map<String, String> getAttributes(RecordSchema schema);
-
-    /**
-     * Ensures that the given schema can be written by this SchemaAccessWriter 
or throws SchemaNotFoundException if
-     * the schema does not contain sufficient information to be written
-     *
-     * @param schema the schema to validate
-     * @throws SchemaNotFoundException if the schema does not contain 
sufficient information to be written
-     */
-    void validateSchema(RecordSchema schema) throws SchemaNotFoundException;
-
-    /**
-     * Specifies the set of SchemaField's that are required in order to use 
this Schema Access Writer
-     *
-     * @return the set of SchemaField's that are required in order to use this 
Schema Access Writer
-     */
-    Set<SchemaField> getRequiredSchemaFields();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
deleted file mode 100644
index 54a248d..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
-public class SchemaNameAsAttribute implements SchemaAccessWriter {
-    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME);
-    private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
-
-    @Override
-    public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
-    }
-
-    @Override
-    public Map<String, String> getAttributes(final RecordSchema schema) {
-        final SchemaIdentifier identifier = schema.getIdentifier();
-        final Optional<String> nameOption = identifier.getName();
-        if (nameOption.isPresent()) {
-            return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, 
nameOption.get());
-        }
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public void validateSchema(final RecordSchema schema) throws 
SchemaNotFoundException {
-        final SchemaIdentifier schemaId = schema.getIdentifier();
-        if (!schemaId.getName().isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Name As 
Attribute because the Schema Name is not known");
-        }
-    }
-
-    @Override
-    public Set<SchemaField> getRequiredSchemaFields() {
-        return schemaFields;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
deleted file mode 100644
index bc21c1d..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
-    private final Set<SchemaField> schemaFields;
-
-    private final SchemaRegistry schemaRegistry;
-    private final PropertyValue schemaNamePropertyValue;
-
-    public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, 
final PropertyValue schemaNamePropertyValue) {
-        this.schemaRegistry = schemaRegistry;
-        this.schemaNamePropertyValue = schemaNamePropertyValue;
-
-        schemaFields = new HashSet<>();
-        schemaFields.add(SchemaField.SCHEMA_NAME);
-        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : 
schemaRegistry.getSuppliedSchemaFields());
-    }
-
-    @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream, final ConfigurationContext context) throws 
SchemaNotFoundException {
-        final String schemaName = 
schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
-        if (schemaName.trim().isEmpty()) {
-            throw new SchemaNotFoundException("FlowFile did not contain 
appropriate attributes to determine Schema Name.");
-        }
-
-        try {
-            final RecordSchema recordSchema = 
schemaRegistry.retrieveSchema(schemaName);
-            if (recordSchema == null) {
-                throw new SchemaNotFoundException("Could not find a schema 
with name '" + schemaName + "' in the configured Schema Registry");
-            }
-
-            return recordSchema;
-        } catch (final Exception e) {
-            throw new SchemaNotFoundException("Could not retrieve schema with 
name '" + schemaName + "' from the configured Schema Registry", e);
-        }
-    }
-
-    @Override
-    public Set<SchemaField> getSuppliedSchemaFields() {
-        return schemaFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
deleted file mode 100644
index f39bdca..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.schema.access;
-
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public class SchemaTextAsAttribute implements SchemaAccessWriter {
-    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
-
-    @Override
-    public void writeHeader(final RecordSchema schema, final OutputStream out) 
{
-    }
-
-    @Override
-    public Map<String, String> getAttributes(final RecordSchema schema) {
-        final Optional<String> textFormatOption = schema.getSchemaFormat();
-        final Optional<String> textOption = schema.getSchemaText();
-        return Collections.singletonMap(textFormatOption.get() + ".schema", 
textOption.get());
-    }
-
-    @Override
-    public void validateSchema(final RecordSchema schema) throws 
SchemaNotFoundException {
-        final Optional<String> textFormatOption = schema.getSchemaFormat();
-        if (!textFormatOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Text as 
Attribute because the Schema's Text Format is not present");
-        }
-
-        final Optional<String> textOption = schema.getSchemaText();
-        if (!textOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Text as 
Attribute because the Schema's Text is not present");
-        }
-    }
-
-    @Override
-    public Set<SchemaField> getRequiredSchemaFields() {
-        return schemaFields;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
deleted file mode 100644
index d5ab8c5..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class DateTimeUtils {
-    public static final PropertyDescriptor DATE_FORMAT = new 
PropertyDescriptor.Builder()
-        .name("Date Format")
-        .description("Specifies the format to use when reading/writing Date 
fields")
-        .expressionLanguageSupported(false)
-        .defaultValue(RecordFieldType.DATE.getDefaultFormat())
-        .addValidator(new SimpleDateFormatValidator())
-        .required(true)
-        .build();
-
-    public static final PropertyDescriptor TIME_FORMAT = new 
PropertyDescriptor.Builder()
-        .name("Time Format")
-        .description("Specifies the format to use when reading/writing Time 
fields")
-        .expressionLanguageSupported(false)
-        .defaultValue(RecordFieldType.TIME.getDefaultFormat())
-        .addValidator(new SimpleDateFormatValidator())
-        .required(true)
-        .build();
-
-    public static final PropertyDescriptor TIMESTAMP_FORMAT = new 
PropertyDescriptor.Builder()
-        .name("Timestamp Format")
-        .description("Specifies the format to use when reading/writing 
Timestamp fields")
-        .expressionLanguageSupported(false)
-        .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat())
-        .addValidator(new SimpleDateFormatValidator())
-        .required(true)
-        .build();
-}

Reply via email to