http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
new file mode 100644
index 0000000..6676ee6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.exception.FailureException;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+
+/**
+ * Base class for processors that write Records to HDFS.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since 
we are triggering when empty
+public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
+
+
+    public static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
+            .name("compression-type")
+            .displayName("Compression Type")
+            .description("The type of compression for the file being written.")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor OVERWRITE = new 
PropertyDescriptor.Builder()
+            .name("overwrite")
+            .displayName("Overwrite Files")
+            .description("Whether or not to overwrite existing files in the 
same directory with the same name. When set to false, " +
+                    "flow files will be routed to failure when a file exists 
in the same directory with the same name.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor UMASK = new 
PropertyDescriptor.Builder()
+            .name("permissions-umask")
+            .displayName("Permissions umask")
+            .description("A umask represented as an octal number which 
determines the permissions of files written to HDFS. " +
+                    "This overrides the Hadoop Configuration dfs.umaskmode")
+            .addValidator(HadoopValidators.UMASK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_OWNER = new 
PropertyDescriptor.Builder()
+            .name("remote-owner")
+            .displayName("Remote Owner")
+            .description("Changes the owner of the HDFS file to this value 
after it is written. " +
+                    "This only works if NiFi is running as a user that has 
HDFS super user privilege to change owner")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_GROUP = new 
PropertyDescriptor.Builder()
+            .name("remote-group")
+            .displayName("Remote Group")
+            .description("Changes the group of the HDFS file to this value 
after it is written. " +
+                    "This only works if NiFi is running as a user that has 
HDFS super user privilege to change group")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The service for reading records from incoming flow 
files.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Flow Files that have been successfully processed are 
transferred to this relationship")
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("Flow Files that could not be processed due to issues 
that can be retried are transferred to this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Flow Files that could not be processed due to issue 
that cannot be retried are transferred to this relationship")
+            .build();
+
+    public static final String RECORD_COUNT_ATTR = "record.count";
+
+    private volatile String remoteOwner;
+    private volatile String remoteGroup;
+    private volatile SchemaAccessStrategy schemaAccessStrategy;
+
+    private volatile Set<Relationship> putHdfsRecordRelationships;
+    private volatile List<PropertyDescriptor> putHdfsRecordProperties;
+
+    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(
+            SCHEMA_NAME_PROPERTY,
+            SCHEMA_TEXT_PROPERTY,
+            HWX_SCHEMA_REF_ATTRIBUTES,
+            HWX_CONTENT_ENCODED_SCHEMA
+    ));
+
+
+    @Override
+    protected final void init(final ProcessorInitializationContext context) {
+        super.init(context);
+
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_RETRY);
+        rels.add(REL_FAILURE);
+        this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(RECORD_READER);
+
+        props.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(DIRECTORY)
+                .description("The parent directory to which files should be 
written. Will be created if it doesn't exist.")
+                .build());
+
+        final AllowableValue[] strategies = 
getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
+
+        props.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
+                .description("Specifies how to obtain the schema that is to be 
used for writing the data.")
+                .allowableValues(strategies)
+                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
+                .build());
+
+        props.add(SCHEMA_REGISTRY);
+        props.add(SCHEMA_NAME);
+        props.add(SCHEMA_TEXT);
+
+        final AllowableValue[] compressionTypes = 
getCompressionTypes(context).toArray(new AllowableValue[0]);
+
+        props.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(COMPRESSION_TYPE)
+                .allowableValues(compressionTypes)
+                .defaultValue(getDefaultCompressionType(context))
+                .build());
+
+        props.add(OVERWRITE);
+        props.add(UMASK);
+        props.add(REMOTE_GROUP);
+        props.add(REMOTE_OWNER);
+        props.addAll(getAdditionalProperties());
+        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
+    }
+
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        return strategyList;
+    }
+
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return SCHEMA_NAME_PROPERTY;
+    }
+
+    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
+        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
+    }
+
+    /**
+     * @param context the initialization context
+     * @return the possible compression types
+     */
+    public abstract List<AllowableValue> getCompressionTypes(final 
ProcessorInitializationContext context);
+
+    /**
+     * @param context the initialization context
+     * @return the default compression type
+     */
+    public abstract String getDefaultCompressionType(final 
ProcessorInitializationContext context);
+
+    /**
+     * Allows sub-classes to add additional properties, called from initialize.
+     *
+     * @return additional properties to add to the overall list
+     */
+    public List<PropertyDescriptor> getAdditionalProperties() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return putHdfsRecordRelationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+       return putHdfsRecordProperties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final String schemaAccessStrategy = 
validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+        return 
SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, 
schemaAccessStrategy, getSchemaAccessStrategyValues());
+    }
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws 
IOException {
+        super.abstractOnScheduled(context);
+
+        final SchemaRegistry schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+
+        final PropertyDescriptor descriptor = 
getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
+        final String schemaAccess = context.getProperty(descriptor).getValue();
+        this.schemaAccessStrategy = 
SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, 
context);
+
+        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
+        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
+
+        // Set umask once, to avoid thread safety issues doing it in onTrigger
+        final PropertyValue umaskProp = context.getProperty(UMASK);
+        final short dfsUmask;
+        if (umaskProp.isSet()) {
+            dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
+        } else {
+            dfsUmask = FsPermission.DEFAULT_UMASK;
+        }
+        final Configuration conf = getConfiguration();
+        FsPermission.setUMask(conf, new FsPermission(dfsUmask));
+    }
+
+    /**
+     * Sub-classes provide the appropriate HDFSRecordWriter.
+     *
+     * @param context the process context to obtain additional configuration
+     * @param flowFile the flow file being written
+     * @param conf the Configuration instance
+     * @param path the path to write to
+     * @param schema the schema for writing
+     * @return the HDFSRecordWriter
+     * @throws IOException if an error occurs creating the writer or 
processing the schema
+     */
+    public abstract HDFSRecordWriter createHDFSRecordWriter(
+            final ProcessContext context,
+            final FlowFile flowFile,
+            final Configuration conf,
+            final Path path,
+            final RecordSchema schema) throws IOException, 
SchemaNotFoundException;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        // do this before getting a flow file so that we always get a chance 
to attempt Kerberos relogin
+        final FileSystem fileSystem = getFileSystem();
+        final Configuration configuration = getConfiguration();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (configuration == null || fileSystem == null || ugi == null) {
+            getLogger().error("Processor not configured properly because 
Configuration, FileSystem, or UserGroupInformation was null");
+            context.yield();
+            return;
+        }
+
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            context.yield();
+            return;
+        }
+
+        ugi.doAs((PrivilegedAction<Object>)() -> {
+            Path tempDotCopyFile = null;
+            FlowFile putFlowFile = flowFile;
+            try {
+                final String filenameValue = 
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
+                final String directoryValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
+
+                // create the directory if it doesn't exist
+                final Path directoryPath = new Path(directoryValue);
+                createDirectory(fileSystem, directoryPath, remoteOwner, 
remoteGroup);
+
+                // write to tempFile first and on success rename to destFile
+                final Path tempFile = new Path(directoryPath, "." + 
filenameValue);
+                final Path destFile = new Path(directoryPath, filenameValue);
+
+                final boolean destinationExists = fileSystem.exists(destFile) 
|| fileSystem.exists(tempFile);
+                final boolean shouldOverwrite = 
context.getProperty(OVERWRITE).asBoolean();
+
+                // if the tempFile or destFile already exist, and overwrite is 
set to false, then transfer to failure
+                if (destinationExists && !shouldOverwrite) {
+                    session.transfer(session.penalize(putFlowFile), 
REL_FAILURE);
+                    getLogger().warn("penalizing {} and routing to failure 
because file with same name already exists", new Object[]{putFlowFile});
+                    return null;
+                }
+
+                final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>(null);
+                final AtomicReference<WriteResult> writeResult = new 
AtomicReference<>();
+                final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+                final FlowFile flowFileIn = putFlowFile;
+                final StopWatch stopWatch = new StopWatch(true);
+
+                // Read records from the incoming FlowFile and write them the 
tempFile
+                session.read(putFlowFile, (final InputStream rawIn) -> {
+                    RecordReader recordReader = null;
+                    HDFSRecordWriter recordWriter = null;
+
+                    try (final BufferedInputStream in = new 
BufferedInputStream(rawIn)) {
+                        final RecordSchema destRecordSchema = 
schemaAccessStrategy.getSchema(flowFile, in);
+                        recordWriter = createHDFSRecordWriter(context, 
flowFile, configuration, tempFile, destRecordSchema);
+
+                        // if we fail to create the RecordReader then we want 
to route to failure, so we need to
+                        // handle this separately from the other IOExceptions 
which normally route to retry
+                        try {
+                            recordReader = 
recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
+                        } catch (Exception e) {
+                            final RecordReaderFactoryException rrfe = new 
RecordReaderFactoryException("Unable to create RecordReader", e);
+                            exceptionHolder.set(rrfe);
+                            return;
+                        }
+
+                        final RecordSet recordSet = 
recordReader.createRecordSet();
+                        writeResult.set(recordWriter.write(recordSet));
+
+                    } catch (Exception e) {
+                        exceptionHolder.set(e);
+                    } finally {
+                        IOUtils.closeQuietly(recordReader);
+                        IOUtils.closeQuietly(recordWriter);
+                    }
+                });
+                stopWatch.stop();
+
+                final String dataRate = 
stopWatch.calculateDataRate(putFlowFile.getSize());
+                final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                tempDotCopyFile = tempFile;
+
+                // if any errors happened within the session.read then throw 
the exception so we jump
+                // into one of the appropriate catch blocks below
+                if (exceptionHolder.get() != null) {
+                    throw exceptionHolder.get();
+                }
+
+                // Attempt to rename from the tempFile to destFile, and change 
owner if successfully renamed
+                rename(fileSystem, tempFile, destFile);
+                changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
+
+                getLogger().info("Wrote {} to {} in {} milliseconds at a rate 
of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
+
+                putFlowFile = postProcess(context, session, putFlowFile, 
destFile);
+
+                final String outputPath = destFile.toString();
+                final String newFilename = destFile.getName();
+                final String hdfsPath = destFile.getParent().toString();
+
+                // Update the filename and absolute path attributes
+                final Map<String,String> attributes = new 
HashMap<>(writeResult.get().getAttributes());
+                attributes.put(CoreAttributes.FILENAME.key(), newFilename);
+                attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                attributes.put(RECORD_COUNT_ATTR, 
String.valueOf(writeResult.get().getRecordCount()));
+                putFlowFile = session.putAllAttributes(putFlowFile, 
attributes);
+
+                // Send a provenance event and transfer to success
+                final String transitUri = (outputPath.startsWith("/")) ? 
"hdfs:/" + outputPath : "hdfs://" + outputPath;
+                session.getProvenanceReporter().send(putFlowFile, transitUri);
+                session.transfer(putFlowFile, REL_SUCCESS);
+
+            } catch (IOException | FlowFileAccessException e) {
+                deleteQuietly(fileSystem, tempDotCopyFile);
+                getLogger().error("Failed to write due to {}", new 
Object[]{e});
+                session.transfer(session.penalize(putFlowFile), REL_RETRY);
+                context.yield();
+            } catch (Throwable t) {
+                deleteQuietly(fileSystem, tempDotCopyFile);
+                getLogger().error("Failed to write due to {}", new 
Object[]{t});
+                session.transfer(putFlowFile, REL_FAILURE);
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * This method will be called after successfully writing to the 
destination file and renaming the file to it's final name
+     * in order to give sub-classes a chance to take action before 
transferring to success.
+     *
+     * @param context the context
+     * @param session the session
+     * @param flowFile the flow file being processed
+     * @param destFile the destination file written to
+     * @return an updated FlowFile reference
+     */
+    protected FlowFile postProcess(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile, final Path destFile) {
+        return flowFile;
+    }
+
+    /**
+     * Attempts to rename srcFile to destFile up to 10 times, with a 200ms 
sleep in between each attempt.
+     *
+     * If the file has not been renamed after 10 attempts, a FailureException 
is thrown.
+     *
+     * @param fileSystem the file system where the files are located
+     * @param srcFile the source file
+     * @param destFile the destination file to rename the source to
+     * @throws IOException if IOException happens while attempting to rename
+     * @throws InterruptedException if renaming is interrupted
+     * @throws FailureException if the file couldn't be renamed after 10 
attempts
+     */
+    protected void rename(final FileSystem fileSystem, final Path srcFile, 
final Path destFile) throws IOException, InterruptedException, FailureException 
{
+        boolean renamed = false;
+        for (int i = 0; i < 10; i++) { // try to rename multiple times.
+            if (fileSystem.rename(srcFile, destFile)) {
+                renamed = true;
+                break;// rename was successful
+            }
+            Thread.sleep(200L);// try waiting to let whatever might cause 
rename failure to resolve
+        }
+        if (!renamed) {
+            fileSystem.delete(srcFile, false);
+            throw new FailureException("Could not rename file " + srcFile + " 
to its final filename");
+        }
+    }
+
+    /**
+     * Deletes the given file from the given filesystem. Any exceptions that 
are encountered will be caught and logged, but not thrown.
+     *
+     * @param fileSystem the filesystem where the file exists
+     * @param file the file to delete
+     */
+    protected void deleteQuietly(final FileSystem fileSystem, final Path file) 
{
+        if (file != null) {
+            try {
+                fileSystem.delete(file, false);
+            } catch (Exception e) {
+                getLogger().error("Unable to remove file {} due to {}", new 
Object[]{file, e});
+            }
+        }
+    }
+
+    /**
+     * Changes the ownership of the given file.
+     *
+     * @param fileSystem the filesystem where the file exists
+     * @param path the file to change ownership on
+     * @param remoteOwner the new owner for the file
+     * @param remoteGroup the new group for the file
+     */
+    protected void changeOwner(final FileSystem fileSystem, final Path path, 
final String remoteOwner, final String remoteGroup) {
+        try {
+            // Change owner and group of file if configured to do so
+            if (remoteOwner != null || remoteGroup != null) {
+                fileSystem.setOwner(path, remoteOwner, remoteGroup);
+            }
+        } catch (Exception e) {
+            getLogger().warn("Could not change owner or group of {} on due to 
{}", new Object[]{path, e});
+        }
+    }
+
+    /**
+     * Creates the given directory and changes the ownership to the specified 
owner/group.
+     *
+     * @param fileSystem the filesystem to create the directory on
+     * @param directory the directory to create
+     * @param remoteOwner the owner for changing ownership of the directory
+     * @param remoteGroup the group for changing ownership of the directory
+     * @throws IOException if an error occurs obtaining the file status or 
issuing the mkdir command
+     * @throws FailureException if the directory could not be created
+     */
+    protected void createDirectory(final FileSystem fileSystem, final Path 
directory, final String remoteOwner, final String remoteGroup) throws 
IOException, FailureException {
+        try {
+            if (!fileSystem.getFileStatus(directory).isDirectory()) {
+                throw new FailureException(directory.toString() + " already 
exists and is not a directory");
+            }
+        } catch (FileNotFoundException fe) {
+            if (!fileSystem.mkdirs(directory)) {
+                throw new FailureException(directory.toString() + " could not 
be created");
+            }
+            changeOwner(fileSystem, directory, remoteOwner, remoteGroup);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java
new file mode 100644
index 0000000..02fa295
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/FailureException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.exception;
+
+/**
+ * An exception to represent an error that occurred during a put to HDFS and 
should not be retried.
+ */
+public class FailureException extends Exception {
+
+    public FailureException(String message) {
+        super(message);
+    }
+
+    public FailureException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java
new file mode 100644
index 0000000..e0f967f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/InvalidSchemaException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.exception;
+
+/**
+ * Thrown when a schema is unable to be parsed into the expected type.
+ */
+public class InvalidSchemaException extends FailureException {
+
+    public InvalidSchemaException(String message) {
+        super(message);
+    }
+
+    public InvalidSchemaException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java
new file mode 100644
index 0000000..83a5b41
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/exception/RecordReaderFactoryException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.exception;
+
+/**
+ * Thrown when an error is occurs while using the record reader factory to 
create a record reader.
+ */
+public class RecordReaderFactoryException extends FailureException {
+
+    public RecordReaderFactoryException(String message) {
+        super(message);
+    }
+
+    public RecordReaderFactoryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java
new file mode 100644
index 0000000..f83d7ff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordReader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.record;
+
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Reads Records from HDFS.
+ */
+public interface HDFSRecordReader extends Closeable {
+
+    Record nextRecord() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java
new file mode 100644
index 0000000..e21665f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/record/HDFSRecordWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.record;
+
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Writes Records to HDFS.
+ */
+public interface HDFSRecordWriter extends Closeable {
+
+    /**
+     * @param record the record to write
+     * @throws IOException if an I/O error happens writing the record
+     */
+    void write(Record record) throws IOException;
+
+    /**
+     * @param recordSet the RecordSet to write
+     * @return the result of writing the record set
+     * @throws IOException if an I/O error happens reading from the RecordSet, 
or writing a Record
+     */
+    default WriteResult write(final RecordSet recordSet) throws IOException {
+        int recordCount = 0;
+
+        Record record;
+        while ((record = recordSet.next()) != null) {
+            write(record);
+            recordCount++;
+        }
+
+        return WriteResult.of(recordCount, Collections.emptyMap());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
new file mode 100644
index 0000000..3a2ea85
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-record-utils</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>nifi-mock-record-utils</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <!-- Other modules using nifi-standard-record-utils are expected to 
have this API available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
new file mode 100644
index 0000000..e3ed23e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class MockRecordParser extends AbstractControllerService implements 
RecordReaderFactory {
+    private final List<Object[]> records = new ArrayList<>();
+    private final List<RecordField> fields = new ArrayList<>();
+    private final int failAfterN;
+
+    public MockRecordParser() {
+        this(-1);
+    }
+
+    public MockRecordParser(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+
+    public void addSchemaField(final String fieldName, final RecordFieldType 
type) {
+        fields.add(new RecordField(fieldName, type.getDataType()));
+    }
+
+    public void addRecord(Object... values) {
+        records.add(values);
+    }
+
+    @Override
+    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, 
ComponentLog logger) throws IOException, SchemaNotFoundException {
+        final Iterator<Object[]> itr = records.iterator();
+
+        return new RecordReader() {
+            private int recordCount = 0;
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Record nextRecord() throws IOException, 
MalformedRecordException {
+                if (failAfterN >= recordCount) {
+                    throw new MalformedRecordException("Intentional Unit Test 
Exception because " + recordCount + " records have been read");
+                }
+                recordCount++;
+
+                if (!itr.hasNext()) {
+                    return null;
+                }
+
+                final Object[] values = itr.next();
+                final Map<String, Object> valueMap = new HashMap<>();
+                int i = 0;
+                for (final RecordField field : fields) {
+                    final String fieldName = field.getFieldName();
+                    valueMap.put(fieldName, values[i++]);
+                }
+
+                return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+            }
+
+            @Override
+            public RecordSchema getSchema() {
+                return new SimpleRecordSchema(fields);
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
new file mode 100644
index 0000000..99c72e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+
+public class MockRecordWriter extends AbstractControllerService implements 
RecordSetWriterFactory {
+    private final String header;
+    private final int failAfterN;
+    private final boolean quoteValues;
+
+    public MockRecordWriter(final String header) {
+        this(header, true, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues) {
+        this(header, quoteValues, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues, 
final int failAfterN) {
+        this.header = header;
+        this.quoteValues = quoteValues;
+        this.failAfterN = failAfterN;
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
FlowFile flowFile, final InputStream in) {
+        return new RecordSetWriter() {
+            @Override
+            public WriteResult write(final RecordSet rs, final OutputStream 
out) throws IOException {
+                out.write(header.getBytes());
+                out.write("\n".getBytes());
+
+                int recordCount = 0;
+                Record record = null;
+                while ((record = rs.next()) != null) {
+                    if (++recordCount > failAfterN && failAfterN > -1) {
+                        throw new IOException("Unit Test intentionally 
throwing IOException after " + failAfterN + " records were written");
+                    }
+
+                    final int numCols = record.getSchema().getFieldCount();
+
+                    int i = 0;
+                    for (final String fieldName : 
record.getSchema().getFieldNames()) {
+                        final String val = record.getAsString(fieldName);
+                        if (quoteValues) {
+                            out.write("\"".getBytes());
+                            if (val != null) {
+                                out.write(val.getBytes());
+                            }
+                            out.write("\"".getBytes());
+                        } else if (val != null) {
+                            out.write(val.getBytes());
+                        }
+
+                        if (i++ < numCols - 1) {
+                            out.write(",".getBytes());
+                        }
+                    }
+                    out.write("\n".getBytes());
+                }
+
+                return WriteResult.of(recordCount, Collections.emptyMap());
+            }
+
+            @Override
+            public String getMimeType() {
+                return "text/plain";
+            }
+
+            @Override
+            public WriteResult write(Record record, OutputStream out) throws 
IOException {
+                out.write(header.getBytes());
+                out.write("\n".getBytes());
+
+                final int numCols = record.getSchema().getFieldCount();
+                int i = 0;
+                for (final String fieldName : 
record.getSchema().getFieldNames()) {
+                    final String val = record.getAsString(fieldName);
+                    if (quoteValues) {
+                        out.write("\"".getBytes());
+                        out.write(val.getBytes());
+                        out.write("\"".getBytes());
+                    } else {
+                        out.write(val.getBytes());
+                    }
+
+                    if (i++ < numCols - 1) {
+                        out.write(",".getBytes());
+                    }
+                }
+                out.write("\n".getBytes());
+
+                return WriteResult.of(1, Collections.emptyMap());
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
new file mode 100644
index 0000000..3aa27a7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-record-utils</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>nifi-standard-record-utils</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <!-- Other modules using nifi-standard-record-utils are expected to 
have these APIs available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
new file mode 100644
index 0000000..d1f0e8a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class HortonworksAttributeSchemaReferenceStrategy implements 
SchemaAccessStrategy {
+    private final Set<SchemaField> schemaFields;
+
+    public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
+    public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
+    public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = 
"schema.protocol.version";
+
+    private final SchemaRegistry schemaRegistry;
+
+
+    public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry 
schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+
+        schemaFields = new HashSet<>();
+        schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
+        schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : 
schemaRegistry.getSuppliedSchemaFields());
+    }
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException, IOException {
+        final String schemaIdentifier = 
flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE);
+        final String schemaVersion = 
flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE);
+        final String schemaProtocol = 
flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
+        if (schemaIdentifier == null || schemaVersion == null || 
schemaProtocol == null) {
+            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because it is missing one of the following three required 
attributes: "
+                + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " 
+ SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
+        }
+
+        if (!isNumber(schemaProtocol)) {
+            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a 
value of '"
+                + schemaProtocol + "', which is not a valid Protocol Version 
number");
+        }
+
+        final int protocol = Integer.parseInt(schemaProtocol);
+        if (protocol != 1) {
+            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a 
value of '"
+                + schemaProtocol + "', which is not a valid Protocol Version 
number. Expected Protocol Version to be 1.");
+        }
+
+        if (!isNumber(schemaIdentifier)) {
+            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+                + schemaProtocol + "', which is not a valid Schema Identifier 
number");
+        }
+
+        if (!isNumber(schemaVersion)) {
+            throw new SchemaNotFoundException("Could not determine Schema for 
" + flowFile + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+                + schemaProtocol + "', which is not a valid Schema Version 
number");
+        }
+
+        final long schemaId = Long.parseLong(schemaIdentifier);
+        final int version = Integer.parseInt(schemaVersion);
+
+        final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, 
version);
+        if (schema == null) {
+            throw new SchemaNotFoundException("Could not find a Schema in the 
Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + 
version + "'");
+        }
+
+        return schema;
+    }
+
+    private static boolean isNumber(final String value) {
+        if (value == null) {
+            return false;
+        }
+
+        final String trimmed = value.trim();
+        if (value.isEmpty()) {
+            return false;
+        }
+
+        for (int i = 0; i < trimmed.length(); i++) {
+            final char c = value.charAt(i);
+            if (c > '9' || c < '0') {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
new file mode 100644
index 0000000..dd7d676
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class HortonworksAttributeSchemaReferenceWriter implements 
SchemaAccessWriter {
+    private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+    private static final int LATEST_PROTOCOL_VERSION = 1;
+
+    @Override
+    public void writeHeader(RecordSchema schema, OutputStream out) throws 
IOException {
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        final Map<String, String> attributes = new HashMap<>(4);
+        final SchemaIdentifier id = schema.getIdentifier();
+
+        final long schemaId = id.getIdentifier().getAsLong();
+        final int schemaVersion = id.getVersion().getAsInt();
+
+        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, 
String.valueOf(schemaId));
+        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE,
 String.valueOf(schemaVersion));
+        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE,
 String.valueOf(LATEST_PROTOCOL_VERSION));
+
+        return attributes;
+    }
+
+    @Override
+    public void validateSchema(final RecordSchema schema) throws 
SchemaNotFoundException {
+        final SchemaIdentifier id = schema.getIdentifier();
+        if (!id.getIdentifier().isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Reference 
as Attributes because it does not contain a Schema Identifier");
+        }
+        if (!id.getVersion().isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Reference 
as Attributes because it does not contain a Schema Version");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return requiredSchemaFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
new file mode 100644
index 0000000..a00e322
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class HortonworksEncodedSchemaReferenceStrategy implements 
SchemaAccessStrategy {
+    private static final int LATEST_PROTOCOL_VERSION = 1;
+
+    private final Set<SchemaField> schemaFields;
+    private final SchemaRegistry schemaRegistry;
+
+    public HortonworksEncodedSchemaReferenceStrategy(final SchemaRegistry 
schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+
+        schemaFields = new HashSet<>();
+        schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
+        schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : 
schemaRegistry.getSuppliedSchemaFields());
+    }
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException, IOException {
+        final byte[] buffer = new byte[13];
+        try {
+            StreamUtils.fillBuffer(contentStream, buffer);
+        } catch (final IOException ioe) {
+            throw new SchemaNotFoundException("Could not read first 13 bytes 
from stream", ioe);
+        }
+
+        // This encoding follows the pattern that is provided for serializing 
data by the Hortonworks Schema Registry serializer
+        // as it is provided at:
+        // 
https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
+        final ByteBuffer bb = ByteBuffer.wrap(buffer);
+        final int protocolVersion = bb.get();
+        if (protocolVersion != 1) {
+            throw new SchemaNotFoundException("Schema Encoding appears to be 
of an incompatible version. The latest known Protocol is Version "
+                + LATEST_PROTOCOL_VERSION + " but the data was encoded with 
version " + protocolVersion);
+        }
+
+        final long schemaId = bb.getLong();
+        final int schemaVersion = bb.getInt();
+
+        return schemaRegistry.retrieveSchema(schemaId, schemaVersion);
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
new file mode 100644
index 0000000..bf6a9ea
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+public class HortonworksEncodedSchemaReferenceWriter implements 
SchemaAccessWriter {
+    private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+    private static final int LATEST_PROTOCOL_VERSION = 1;
+
+    @Override
+    public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
+        final SchemaIdentifier identifier = schema.getIdentifier();
+        final long id = identifier.getIdentifier().getAsLong();
+        final int version = identifier.getVersion().getAsInt();
+
+        // This decoding follows the pattern that is provided for serializing 
data by the Hortonworks Schema Registry serializer
+        // as it is provided at:
+        // 
https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
+        final ByteBuffer bb = ByteBuffer.allocate(13);
+        bb.put((byte) LATEST_PROTOCOL_VERSION);
+        bb.putLong(id);
+        bb.putInt(version);
+
+        out.write(bb.array());
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void validateSchema(RecordSchema schema) throws 
SchemaNotFoundException {
+        final SchemaIdentifier identifier = schema.getIdentifier();
+        final OptionalLong identifierOption = identifier.getIdentifier();
+        if (!identifierOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Encoded Schema 
Reference because the Schema Identifier is not known");
+        }
+
+        final OptionalInt versionOption = identifier.getVersion();
+        if (!versionOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Encoded Schema 
Reference because the Schema Version is not known");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return requiredSchemaFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
new file mode 100644
index 0000000..68f9ecf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+public interface SchemaAccessStrategy {
+    /**
+     * Returns the schema for the given FlowFile using the supplied stream of 
content and configuration
+     *
+     * @param flowFile flowfile
+     * @param contentStream content of flowfile
+     * @return the RecordSchema for the FlowFile
+     */
+    RecordSchema getSchema(FlowFile flowFile, InputStream contentStream) 
throws SchemaNotFoundException, IOException;
+
+    /**
+     * @return the set of all Schema Fields that are supplied by the 
RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}.
+     */
+    Set<SchemaField> getSuppliedSchemaFields();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
new file mode 100644
index 0000000..30a995c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public interface SchemaAccessWriter {
+
+    /**
+     * Writes the given Record Schema to the given OutputStream as header 
information, if appropriate,
+     * or returns without writing anything if the implementation does not need 
to write information to
+     * the contents of the FlowFile
+     *
+     * @param schema the schema to write
+     * @param out the OutputStream to write to
+     * @throws IOException if unable to write to the given stream
+     */
+    void writeHeader(RecordSchema schema, OutputStream out) throws IOException;
+
+    /**
+     * Returns a Map of String to String that represent the attributes that 
should be added to the FlowFile, or
+     * an empty map if no attributes should be added.
+     *
+     * @return a Map of attributes to add to the FlowFile.
+     */
+    Map<String, String> getAttributes(RecordSchema schema);
+
+    /**
+     * Ensures that the given schema can be written by this SchemaAccessWriter 
or throws SchemaNotFoundException if
+     * the schema does not contain sufficient information to be written
+     *
+     * @param schema the schema to validate
+     * @throws SchemaNotFoundException if the schema does not contain 
sufficient information to be written
+     */
+    void validateSchema(RecordSchema schema) throws SchemaNotFoundException;
+
+    /**
+     * Specifies the set of SchemaField's that are required in order to use 
this Schema Access Writer
+     *
+     * @return the set of SchemaField's that are required in order to use this 
Schema Access Writer
+     */
+    Set<SchemaField> getRequiredSchemaFields();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
new file mode 100644
index 0000000..54a248d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+public class SchemaNameAsAttribute implements SchemaAccessWriter {
+    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME);
+    private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
+
+    @Override
+    public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        final SchemaIdentifier identifier = schema.getIdentifier();
+        final Optional<String> nameOption = identifier.getName();
+        if (nameOption.isPresent()) {
+            return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, 
nameOption.get());
+        }
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void validateSchema(final RecordSchema schema) throws 
SchemaNotFoundException {
+        final SchemaIdentifier schemaId = schema.getIdentifier();
+        if (!schemaId.getName().isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Name As 
Attribute because the Schema Name is not known");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return schemaFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
new file mode 100644
index 0000000..d59e5da
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
+    private final Set<SchemaField> schemaFields;
+
+    private final SchemaRegistry schemaRegistry;
+    private final PropertyValue schemaNamePropertyValue;
+
+    public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, 
final PropertyValue schemaNamePropertyValue) {
+        this.schemaRegistry = schemaRegistry;
+        this.schemaNamePropertyValue = schemaNamePropertyValue;
+
+        schemaFields = new HashSet<>();
+        schemaFields.add(SchemaField.SCHEMA_NAME);
+        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : 
schemaRegistry.getSuppliedSchemaFields());
+    }
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException {
+        final String schemaName = 
schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
+        if (schemaName.trim().isEmpty()) {
+            throw new SchemaNotFoundException("FlowFile did not contain 
appropriate attributes to determine Schema Name.");
+        }
+
+        try {
+            final RecordSchema recordSchema = 
schemaRegistry.retrieveSchema(schemaName);
+            if (recordSchema == null) {
+                throw new SchemaNotFoundException("Could not find a schema 
with name '" + schemaName + "' in the configured Schema Registry");
+            }
+
+            return recordSchema;
+        } catch (final Exception e) {
+            throw new SchemaNotFoundException("Could not retrieve schema with 
name '" + schemaName + "' from the configured Schema Registry", e);
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
new file mode 100644
index 0000000..f39bdca
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class SchemaTextAsAttribute implements SchemaAccessWriter {
+    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
+
+    @Override
+    public void writeHeader(final RecordSchema schema, final OutputStream out) 
{
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        final Optional<String> textFormatOption = schema.getSchemaFormat();
+        final Optional<String> textOption = schema.getSchemaText();
+        return Collections.singletonMap(textFormatOption.get() + ".schema", 
textOption.get());
+    }
+
+    @Override
+    public void validateSchema(final RecordSchema schema) throws 
SchemaNotFoundException {
+        final Optional<String> textFormatOption = schema.getSchemaFormat();
+        if (!textFormatOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Text as 
Attribute because the Schema's Text Format is not present");
+        }
+
+        final Optional<String> textOption = schema.getSchemaText();
+        if (!textOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Text as 
Attribute because the Schema's Text is not present");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return schemaFields;
+    }
+}

Reply via email to