Repository: nifi
Updated Branches:
  refs/heads/master 4ed7511be -> e7dcb6f6c


http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
index 8553c32..c0160e6 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -16,6 +16,15 @@
  */
 package org.apache.nifi.record.script;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -30,15 +39,6 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-import javax.script.Invocable;
-import javax.script.ScriptException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.Collection;
-import java.util.HashSet;
-
 /**
  * A RecordSetWriter implementation that allows the user to script the 
RecordWriter instance
  */
@@ -149,14 +149,14 @@ public class ScriptedRecordSetWriter extends 
AbstractScriptedRecordFactory<Recor
     }
 
     @Override
-    public RecordSchema getSchema(FlowFile flowFile, InputStream in) throws 
SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) 
throws SchemaNotFoundException, IOException {
         final RecordSetWriterFactory writerFactory = recordFactory.get();
         if (writerFactory == null) {
             return null;
         }
 
         try {
-            return writerFactory.getSchema(flowFile, in);
+            return writerFactory.getSchema(flowFile, readSchema);
         } catch (UndeclaredThrowableException ute) {
             throw new IOException(ute.getCause());
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
index 42a9826..22e984a 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
@@ -101,9 +101,7 @@ class ScriptedRecordSetWriterTest {
         recordSetWriterFactory.onEnabled configurationContext
 
         MockFlowFile mockFlowFile = new MockFlowFile(1L)
-        InputStream inStream = new ByteArrayInputStream('Flow file content not 
used'.bytes)
-
-               def schema = recordSetWriterFactory.getSchema(mockFlowFile, 
inStream)
+               def schema = recordSetWriterFactory.getSchema(mockFlowFile, 
null)
         
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
         RecordSetWriter recordSetWriter = 
recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream)

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
index c961171..fa4a552 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
@@ -102,7 +102,7 @@ class GroovyRecordSetWriter implements RecordSetWriter {
 class GroovyRecordSetWriterFactory extends AbstractControllerService 
implements RecordSetWriterFactory {
 
     @Override
-    public RecordSchema getSchema(FlowFile flowFile, InputStream inStream) 
throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) 
throws SchemaNotFoundException, IOException {
         return null;
     }
     

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 9951a8b..ab1219e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -100,15 +99,6 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
 
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        final RecordSchema writeSchema;
-        try (final InputStream rawIn = session.read(flowFile);
-            final InputStream in = new BufferedInputStream(rawIn)) {
-            writeSchema = writerFactory.getSchema(flowFile, in);
-        } catch (final Exception e) {
-            getLogger().error("Failed to process records for {}; will route to 
failure", new Object[] {flowFile, e});
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
 
         final Map<String, String> attributes = new HashMap<>();
         final AtomicInteger recordCount = new AtomicInteger();
@@ -119,30 +109,31 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
                 @Override
                 public void process(final InputStream in, final OutputStream 
out) throws IOException {
 
-                    try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger());
-                        final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
+                    try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
 
-                        writer.beginRecordSet();
+                        final RecordSchema writeSchema = 
writerFactory.getSchema(original, reader.getSchema());
+                        try (final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
+                            writer.beginRecordSet();
 
-                        Record record;
-                        while ((record = reader.nextRecord()) != null) {
-                            final Record processed = 
AbstractRecordProcessor.this.process(record, writeSchema, original, context);
-                            writer.write(processed);
-                        }
-
-                        final WriteResult writeResult = 
writer.finishRecordSet();
-                        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                        attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-                        attributes.putAll(writeResult.getAttributes());
-                        recordCount.set(writeResult.getRecordCount());
+                            Record record;
+                            while ((record = reader.nextRecord()) != null) {
+                                final Record processed = 
AbstractRecordProcessor.this.process(record, writeSchema, original, context);
+                                writer.write(processed);
+                            }
 
+                            final WriteResult writeResult = 
writer.finishRecordSet();
+                            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                            attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                            attributes.putAll(writeResult.getAttributes());
+                            recordCount.set(writeResult.getRecordCount());
+                        }
                     } catch (final SchemaNotFoundException | 
MalformedRecordException e) {
                         throw new ProcessException("Could not parse incoming 
data", e);
                     }
                 }
             });
         } catch (final Exception e) {
-            getLogger().error("Failed to process {}", new Object[] {flowFile, 
e});
+            getLogger().error("Failed to process {}; will route to failure", 
new Object[] {flowFile, e});
             session.transfer(flowFile, REL_FAILURE);
             return;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
index e586a82..6664c7b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -112,15 +111,7 @@ public abstract class AbstractRouteRecord<T> extends 
AbstractProcessor {
 
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        final RecordSchema writeSchema;
-        try (final InputStream rawIn = session.read(flowFile);
-            final InputStream in = new BufferedInputStream(rawIn)) {
-            writeSchema = writerFactory.getSchema(flowFile, in);
-        } catch (final Exception e) {
-            getLogger().error("Failed to process records for {}; will route to 
failure", new Object[] {flowFile, e});
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
+
 
         final AtomicInteger numRecords = new AtomicInteger(0);
         final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = 
new HashMap<>();
@@ -131,6 +122,8 @@ public abstract class AbstractRouteRecord<T> extends 
AbstractProcessor {
                 public void process(final InputStream in) throws IOException {
                     try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
 
+                        final RecordSchema writeSchema = 
writerFactory.getSchema(original, reader.getSchema());
+
                         Record record;
                         while ((record = reader.nextRecord()) != null) {
                             final Set<Relationship> relationships = 
route(record, writeSchema, original, context, flowFileContext);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
index e69cd72..56267fe 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -175,15 +174,6 @@ public class PartitionRecord extends AbstractProcessor {
 
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        final RecordSchema writeSchema;
-        try (final InputStream rawIn = session.read(flowFile);
-            final InputStream in = new BufferedInputStream(rawIn)) {
-            writeSchema = writerFactory.getSchema(flowFile, in);
-        } catch (final Exception e) {
-            getLogger().error("Failed to partition records for {}; will route 
to failure", new Object[] {flowFile, e});
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
 
         final Map<String, RecordPath> recordPaths;
         try {
@@ -203,6 +193,8 @@ public class PartitionRecord extends AbstractProcessor {
         try (final InputStream in = session.read(flowFile)) {
             final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger());
 
+            final RecordSchema writeSchema = writerFactory.getSchema(flowFile, 
reader.getSchema());
+
             Record record;
             while ((record = reader.nextRecord()) != null) {
                 final Map<String, List<ValueWrapper>> recordMap = new 
HashMap<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 8a9a771..f0ed52d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -72,9 +71,10 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.queryrecord.FlowFileTable;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
@@ -241,19 +241,19 @@ public class QueryRecord extends AbstractProcessor {
 
         final StopWatch stopWatch = new StopWatch(true);
 
-        final RecordSetWriterFactory resultSetWriterFactory = 
context.getProperty(RECORD_WRITER_FACTORY)
-            .asControllerService(RecordSetWriterFactory.class);
-        final RecordReaderFactory recordParserFactory = 
context.getProperty(RECORD_READER_FACTORY)
-            .asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
 
         final Map<FlowFile, Relationship> transformedFlowFiles = new 
HashMap<>();
         final Set<FlowFile> createdFlowFiles = new HashSet<>();
 
+        // Determine the schema for writing the data
         final RecordSchema recordSchema;
+        try (final InputStream rawIn = session.read(original)) {
+            final RecordReader reader = 
recordReaderFactory.createRecordReader(original, rawIn, getLogger());
+            final RecordSchema inputSchema = reader.getSchema();
 
-        try (final InputStream rawIn = session.read(original);
-            final InputStream in = new BufferedInputStream(rawIn)) {
-            recordSchema = resultSetWriterFactory.getSchema(original, in);
+            recordSchema = recordSetWriterFactory.getSchema(original, 
inputSchema);
         } catch (final Exception e) {
             getLogger().error("Failed to determine Record Schema from {}; 
routing to failure", new Object[] {original, e});
             session.transfer(original, REL_FAILURE);
@@ -281,9 +281,9 @@ public class QueryRecord extends AbstractProcessor {
                     final AtomicReference<WriteResult> writeResultRef = new 
AtomicReference<>();
                     final QueryResult queryResult;
                     if (context.getProperty(CACHE_SCHEMA).asBoolean()) {
-                        queryResult = queryWithCache(session, original, sql, 
context, recordParserFactory);
+                        queryResult = queryWithCache(session, original, sql, 
context, recordReaderFactory);
                     } else {
-                        queryResult = query(session, original, sql, context, 
recordParserFactory);
+                        queryResult = query(session, original, sql, context, 
recordReaderFactory);
                     }
 
                     final AtomicReference<String> mimeTypeRef = new 
AtomicReference<>();
@@ -293,7 +293,7 @@ public class QueryRecord extends AbstractProcessor {
                         transformed = session.write(transformed, new 
OutputStreamCallback() {
                             @Override
                             public void process(final OutputStream out) throws 
IOException {
-                                try (final RecordSetWriter resultSetWriter = 
resultSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, 
out)) {
+                                try (final RecordSetWriter resultSetWriter = 
recordSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, 
out)) {
                                     final ResultSetRecordSet resultSet = new 
ResultSetRecordSet(rs);
                                     
writeResultRef.set(resultSetWriter.write(resultSet));
                                     
mimeTypeRef.set(resultSetWriter.getMimeType());
@@ -362,6 +362,7 @@ public class QueryRecord extends AbstractProcessor {
         session.adjustCounter("Records Read", recordsRead, false);
     }
 
+
     private synchronized CachedStatement getStatement(final String sql, final 
Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
         final FlowFile flowFile, final RecordReaderFactory 
recordReaderFactory) throws SQLException {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
index 853e88d..00546d2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -132,15 +131,6 @@ public class SplitRecord extends AbstractProcessor {
 
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        final RecordSchema schema;
-        try (final InputStream rawIn = session.read(original);
-            final InputStream in = new BufferedInputStream(rawIn)) {
-            schema = writerFactory.getSchema(original, in);
-        } catch (final Exception e) {
-            getLogger().error("Failed to create Record Writer for {}; routing 
to failure", new Object[] {original, e});
-            session.transfer(original, REL_FAILURE);
-            return;
-        }
 
         final int maxRecords = 
context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger();
 
@@ -151,6 +141,8 @@ public class SplitRecord extends AbstractProcessor {
                 public void process(final InputStream in) throws IOException {
                     try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
 
+                        final RecordSchema schema = 
writerFactory.getSchema(original, reader.getSchema());
+
                         final RecordSet recordSet = reader.createRecordSet();
                         final PushBackRecordSet pushbackSet = new 
PushBackRecordSet(recordSet);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index c6035f2..0443a11 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -38,15 +46,6 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class TestQueryRecord {
 
     static {
@@ -261,7 +260,7 @@ public class TestQueryRecord {
         }
 
         @Override
-        public RecordSchema getSchema(FlowFile flowFile, InputStream content) 
throws SchemaNotFoundException, IOException {
+        public RecordSchema getSchema(FlowFile flowFile, RecordSchema 
readSchema) throws SchemaNotFoundException, IOException {
             final List<RecordField> recordFields = columnNames.stream()
                 .map(name -> new RecordField(name, 
RecordFieldType.STRING.getDataType()))
                 .collect(Collectors.toList());

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
index c819a6c..22b8805 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
@@ -53,11 +53,12 @@ public interface RecordSetWriterFactory extends 
ControllerService {
      * </p>
      *
      * @param flowFile the FlowFile from which the schema should be determined.
-     * @param content the contents of the FlowFile from which to determine the 
schema
+     * @param readSchema the schema that was read from the incoming FlowFile, 
or <code>null</code> if there is no input schema
+     *
      * @return the Schema that should be used for writing Records
      * @throws SchemaNotFoundException if unable to find the schema
      */
-    RecordSchema getSchema(FlowFile flowFile, InputStream content) throws 
SchemaNotFoundException, IOException;
+    RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws 
SchemaNotFoundException, IOException;
 
     /**
      * <p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 25a115d..88b657c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -89,7 +89,7 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
             return new AvroReaderWithEmbeddedSchema(in);
         } else {
-            final RecordSchema recordSchema = getSchema(flowFile, in);
+            final RecordSchema recordSchema = getSchema(flowFile, in, null);
 
             final Schema avroSchema;
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 70da1e8..fd09961 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -65,7 +65,7 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         try {
             final Schema avroSchema;
             try {
-                if (recordSchema.getSchemaFormat().isPresent() & 
recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
+                if (recordSchema.getSchemaFormat().isPresent() && 
recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
                     final Optional<String> textOption = 
recordSchema.getSchemaText();
                     if (textOption.isPresent()) {
                         avroSchema = compileAvroSchema(textOption.get());

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
index eba9429..d32e3e5 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
@@ -36,7 +36,7 @@ public class EmbeddedAvroSchemaAccessStrategy implements 
SchemaAccessStrategy {
     private final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, 
IOException {
         final DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>());
         final Schema avroSchema = dataFileStream.getSchema();
         final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
index f2b1cbb..fa60b2a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
@@ -53,7 +53,7 @@ public class CSVHeaderSchemaStrategy implements 
SchemaAccessStrategy {
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
         if (this.context == null) {
             throw new SchemaNotFoundException("Schema Access Strategy intended 
only for validation purposes and cannot obtain schema");
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index dbea3dc..1528052 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -89,7 +89,7 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
         // Use Mark/Reset of a BufferedInputStream in case we read from the 
Input Stream for the header.
         final BufferedInputStream bufferedIn = new BufferedInputStream(in);
         bufferedIn.mark(1024 * 1024);
-        final RecordSchema schema = getSchema(flowFile, new 
NonCloseableInputStream(bufferedIn));
+        final RecordSchema schema = getSchema(flowFile, new 
NonCloseableInputStream(bufferedIn), null);
         bufferedIn.reset();
 
         return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, 
dateFormat, timeFormat, timestampFormat);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index dcf8b5a..fae3eba 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -222,7 +222,7 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
             private final Set<SchemaField> schemaFields = 
EnumSet.noneOf(SchemaField.class);
 
             @Override
-            public RecordSchema getSchema(final FlowFile flowFile, final 
InputStream contentStream) throws SchemaNotFoundException {
+            public RecordSchema getSchema(final FlowFile flowFile, final 
InputStream contentStream, final RecordSchema readSchema) throws 
SchemaNotFoundException {
                 return recordSchema;
             }
 
@@ -235,7 +235,7 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
     @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException, 
SchemaNotFoundException {
-        final RecordSchema schema = getSchema(flowFile, in);
+        final RecordSchema schema = getSchema(flowFile, in, null);
         return new GrokRecordReader(in, grok, schema, appendUnmatchedLine);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
index 2d11a9b..45cbbd1 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
@@ -129,7 +129,7 @@ public class JsonPathReader extends SchemaRegistryService 
implements RecordReade
 
     @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException, 
MalformedRecordException, SchemaNotFoundException {
-        final RecordSchema schema = getSchema(flowFile, in);
+        final RecordSchema schema = getSchema(flowFile, in, null);
         return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, 
dateFormat, timeFormat, timestampFormat);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 1dd9834..063c9df 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -70,6 +70,6 @@ public class JsonTreeReader extends SchemaRegistryService 
implements RecordReade
 
     @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException, 
MalformedRecordException, SchemaNotFoundException {
-        return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, 
in), dateFormat, timeFormat, timestampFormat);
+        return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in, 
null), dateFormat, timeFormat, timestampFormat);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 7b60815..0acf6ff 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.serialization;
 
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -34,11 +38,12 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
 import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNameAsAttribute;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schema.access.SchemaTextAsAttribute;
+import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryService {
@@ -58,6 +63,7 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
     static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new 
AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
         "The FlowFile will be given a set of 3 attributes to describe the 
schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. 
Note that if "
             + "the schema for a record does not contain the necessary 
identifier and version, an Exception will be thrown when attempting to write 
the data.");
+    static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", 
"Do Not Write Schema", "Do not add any schema-related information to the 
FlowFile.");
 
     /**
      * This constant is just a base spec for the actual PropertyDescriptor.
@@ -67,7 +73,7 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
     private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new 
PropertyDescriptor.Builder()
         .name("Schema Write Strategy")
         .description("Specifies how the schema for a Record should be added to 
the data.")
-        .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+        .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)
         .defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue())
         .required(true)
         .build();
@@ -76,7 +82,10 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
     private volatile ConfigurationContext configurationContext;
     private volatile SchemaAccessWriter schemaAccessWriter;
 
-    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_ATTRIBUTE, 
AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
+    private final List<AllowableValue> schemaWriteStrategyList = 
Collections.unmodifiableList(Arrays.asList(
+        SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA));
+    private final List<AllowableValue> schemaAccessStrategyList = 
Collections.unmodifiableList(Arrays.asList(
+        SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
 
 
     @Override
@@ -98,6 +107,11 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
         return SCHEMA_NAME_ATTRIBUTE;
     }
 
+    @Override
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return INHERIT_RECORD_SCHEMA;
+    }
+
     protected PropertyDescriptor getSchemaWriteStrategyDescriptor() {
         return getPropertyDescriptor(SCHEMA_WRITE_STRATEGY.getName());
     }
@@ -121,7 +135,12 @@ public abstract class SchemaRegistryRecordSetWriter 
extends SchemaRegistryServic
     }
 
     protected List<AllowableValue> getSchemaWriteStrategyValues() {
-        return strategyList;
+        return schemaWriteStrategyList;
+    }
+
+    @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        return schemaAccessStrategyList;
     }
 
     protected SchemaAccessWriter getSchemaWriteStrategy(final String 
allowableValue) {
@@ -132,11 +151,13 @@ public abstract class SchemaRegistryRecordSetWriter 
extends SchemaRegistryServic
         if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) 
{
             return new SchemaNameAsAttribute();
         } else if 
(allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
-            return new SchemaTextAsAttribute();
+            return new WriteAvroSchemaAttributeStrategy();
         } else if 
(allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
             return new HortonworksEncodedSchemaReferenceWriter();
         } else if 
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
             return new HortonworksAttributeSchemaReferenceWriter();
+        } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) {
+            return new NopSchemaAccessWriter();
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index b2c683d..ddcfb0c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -32,6 +32,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -55,8 +56,10 @@ public abstract class SchemaRegistryService extends 
AbstractControllerService {
 
     private volatile ConfigurationContext configurationContext;
     private volatile SchemaAccessStrategy schemaAccessStrategy;
+    private static final InputStream EMPTY_INPUT_STREAM = new 
ByteArrayInputStream(new byte[0]);
 
-    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_PROPERTY, 
SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
+    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(
+        SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, 
HWX_CONTENT_ENCODED_SCHEMA));
 
     protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
         return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
@@ -95,6 +98,7 @@ public abstract class SchemaRegistryService extends 
AbstractControllerService {
         this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, 
schemaRegistry, context);
     }
 
+    @Override
     protected ConfigurationContext getConfigurationContext() {
         return configurationContext;
     }
@@ -103,13 +107,22 @@ public abstract class SchemaRegistryService extends 
AbstractControllerService {
         return schemaAccessStrategy;
     }
 
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException, IOException {
+    public final RecordSchema getSchema(final FlowFile flowFile, final 
InputStream contentStream, final RecordSchema readSchema) throws 
SchemaNotFoundException, IOException {
+        final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
+        if (accessStrategy == null) {
+            throw new SchemaNotFoundException("Could not determine the Schema 
Access Strategy for this service");
+        }
+
+        return getSchemaAccessStrategy().getSchema(flowFile, contentStream, 
readSchema);
+    }
+
+    public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema 
readSchema) throws SchemaNotFoundException, IOException {
         final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
         if (accessStrategy == null) {
             throw new SchemaNotFoundException("Could not determine the Schema 
Access Strategy for this service");
         }
 
-        return getSchemaAccessStrategy().getSchema(flowFile, contentStream);
+        return getSchemaAccessStrategy().getSchema(flowFile, 
EMPTY_INPUT_STREAM, readSchema);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
index e6a9054..7057e43 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
@@ -18,11 +18,9 @@
 package org.apache.nifi.text;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -38,7 +36,6 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 @Tags({"text", "freeform", "expression", "language", "el", "record", 
"recordset", "resultset", "writer", "serialize"})
@@ -46,8 +43,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
     + "text is able to make use of the Expression Language to reference each 
of the fields that are available "
     + "in a Record. Each record in the RecordSet will be separated by a single 
newline character.")
 public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter 
implements RecordSetWriterFactory {
-    private static final RecordSchema EMPTY_SCHEMA = new 
SimpleRecordSchema(Collections.emptyList());
-
     static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
         .name("Text")
         .description("The text to use when writing the results. This property 
will evaluate the Expression Language using any of the fields available in a 
Record.")
@@ -87,7 +82,7 @@ public class FreeFormTextRecordSetWriter extends 
SchemaRegistryRecordSetWriter i
     }
 
     @Override
-    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
contentStream) throws SchemaNotFoundException, IOException {
-        return EMPTY_SCHEMA;
+    public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema 
readSchema) throws SchemaNotFoundException, IOException {
+        return readSchema;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
index c25f0aa..33c0857 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
@@ -27,7 +27,7 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaTextAsAttribute;
+import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.junit.Assert;
@@ -36,7 +36,7 @@ public class TestWriteAvroResultWithoutSchema extends 
TestWriteAvroResult {
 
     @Override
     protected RecordSetWriter createWriter(final Schema schema, final 
OutputStream out) throws IOException {
-        return new WriteAvroResultWithExternalSchema(schema, 
AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute(), out);
+        return new WriteAvroResultWithExternalSchema(schema, 
AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
index 20548db..e9de978 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
@@ -56,7 +56,7 @@ public class TestCSVHeaderSchemaStrategy {
 
         final RecordSchema schema;
         try (final InputStream bais = new ByteArrayInputStream(headerBytes)) {
-            schema = strategy.getSchema(null, bais);
+            schema = strategy.getSchema(null, bais, null);
         }
 
         final List<String> expectedFieldNames = Arrays.asList("a", "b", "c", 
"d", "e,z", "f");

Reply via email to