Repository: nifi Updated Branches: refs/heads/master 4ed7511be -> e7dcb6f6c
http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java index 8553c32..c0160e6 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.record.script; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Collection; +import java.util.HashSet; + +import javax.script.Invocable; +import javax.script.ScriptException; + import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -30,15 +39,6 @@ import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordSchema; -import javax.script.Invocable; -import javax.script.ScriptException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.Collection; -import java.util.HashSet; - /** * A RecordSetWriter implementation that allows the user to script the RecordWriter instance */ @@ -149,14 +149,14 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor } @Override - public RecordSchema getSchema(FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException { final RecordSetWriterFactory writerFactory = recordFactory.get(); if (writerFactory == null) { return null; } try { - return writerFactory.getSchema(flowFile, in); + return writerFactory.getSchema(flowFile, readSchema); } catch (UndeclaredThrowableException ute) { throw new IOException(ute.getCause()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy index 42a9826..22e984a 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy @@ -101,9 +101,7 @@ class ScriptedRecordSetWriterTest { recordSetWriterFactory.onEnabled configurationContext MockFlowFile mockFlowFile = new MockFlowFile(1L) - InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) - - def schema = recordSetWriterFactory.getSchema(mockFlowFile, inStream) + def schema = recordSetWriterFactory.getSchema(mockFlowFile, null) ByteArrayOutputStream outputStream = new ByteArrayOutputStream() RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream) http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy index c961171..fa4a552 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -102,7 +102,7 @@ class GroovyRecordSetWriter implements RecordSetWriter { class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @Override - public RecordSchema getSchema(FlowFile flowFile, InputStream inStream) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException { return null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java index 9951a8b..ab1219e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -100,15 +99,6 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema writeSchema; - try (final InputStream rawIn = session.read(flowFile); - final InputStream in = new BufferedInputStream(rawIn)) { - writeSchema = writerFactory.getSchema(flowFile, in); - } catch (final Exception e) { - getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } final Map<String, String> attributes = new HashMap<>(); final AtomicInteger recordCount = new AtomicInteger(); @@ -119,30 +109,31 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger()); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) { + try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - writer.beginRecordSet(); + final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema()); + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) { + writer.beginRecordSet(); - Record record; - while ((record = reader.nextRecord()) != null) { - final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context); - writer.write(processed); - } - - final WriteResult writeResult = writer.finishRecordSet(); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); - recordCount.set(writeResult.getRecordCount()); + Record record; + while ((record = reader.nextRecord()) != null) { + final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context); + writer.write(processed); + } + final WriteResult writeResult = writer.finishRecordSet(); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + recordCount.set(writeResult.getRecordCount()); + } } catch (final SchemaNotFoundException | MalformedRecordException e) { throw new ProcessException("Could not parse incoming data", e); } } }); } catch (final Exception e) { - getLogger().error("Failed to process {}", new Object[] {flowFile, e}); + getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e}); session.transfer(flowFile, REL_FAILURE); return; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java index e586a82..6664c7b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -112,15 +111,7 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema writeSchema; - try (final InputStream rawIn = session.read(flowFile); - final InputStream in = new BufferedInputStream(rawIn)) { - writeSchema = writerFactory.getSchema(flowFile, in); - } catch (final Exception e) { - getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } + final AtomicInteger numRecords = new AtomicInteger(0); final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>(); @@ -131,6 +122,8 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor { public void process(final InputStream in) throws IOException { try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema()); + Record record; while ((record = reader.nextRecord()) != null) { final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java index e69cd72..56267fe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -175,15 +174,6 @@ public class PartitionRecord extends AbstractProcessor { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema writeSchema; - try (final InputStream rawIn = session.read(flowFile); - final InputStream in = new BufferedInputStream(rawIn)) { - writeSchema = writerFactory.getSchema(flowFile, in); - } catch (final Exception e) { - getLogger().error("Failed to partition records for {}; will route to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } final Map<String, RecordPath> recordPaths; try { @@ -203,6 +193,8 @@ public class PartitionRecord extends AbstractProcessor { try (final InputStream in = session.read(flowFile)) { final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + final RecordSchema writeSchema = writerFactory.getSchema(flowFile, reader.getSchema()); + Record record; while ((record = reader.nextRecord()) != null) { final Map<String, List<ValueWrapper>> recordMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 8a9a771..f0ed52d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.BufferedInputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -72,9 +71,10 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.queryrecord.FlowFileTable; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.ResultSetRecordSet; @@ -241,19 +241,19 @@ public class QueryRecord extends AbstractProcessor { final StopWatch stopWatch = new StopWatch(true); - final RecordSetWriterFactory resultSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY) - .asControllerService(RecordSetWriterFactory.class); - final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) - .asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>(); final Set<FlowFile> createdFlowFiles = new HashSet<>(); + // Determine the schema for writing the data final RecordSchema recordSchema; + try (final InputStream rawIn = session.read(original)) { + final RecordReader reader = recordReaderFactory.createRecordReader(original, rawIn, getLogger()); + final RecordSchema inputSchema = reader.getSchema(); - try (final InputStream rawIn = session.read(original); - final InputStream in = new BufferedInputStream(rawIn)) { - recordSchema = resultSetWriterFactory.getSchema(original, in); + recordSchema = recordSetWriterFactory.getSchema(original, inputSchema); } catch (final Exception e) { getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e}); session.transfer(original, REL_FAILURE); @@ -281,9 +281,9 @@ public class QueryRecord extends AbstractProcessor { final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>(); final QueryResult queryResult; if (context.getProperty(CACHE_SCHEMA).asBoolean()) { - queryResult = queryWithCache(session, original, sql, context, recordParserFactory); + queryResult = queryWithCache(session, original, sql, context, recordReaderFactory); } else { - queryResult = query(session, original, sql, context, recordParserFactory); + queryResult = query(session, original, sql, context, recordReaderFactory); } final AtomicReference<String> mimeTypeRef = new AtomicReference<>(); @@ -293,7 +293,7 @@ public class QueryRecord extends AbstractProcessor { transformed = session.write(transformed, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { - try (final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) { + try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) { final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs); writeResultRef.set(resultSetWriter.write(resultSet)); mimeTypeRef.set(resultSetWriter.getMimeType()); @@ -362,6 +362,7 @@ public class QueryRecord extends AbstractProcessor { session.adjustCounter("Records Read", recordsRead, false); } + private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session, final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException { http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java index 853e88d..00546d2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -132,15 +131,6 @@ public class SplitRecord extends AbstractProcessor { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema schema; - try (final InputStream rawIn = session.read(original); - final InputStream in = new BufferedInputStream(rawIn)) { - schema = writerFactory.getSchema(original, in); - } catch (final Exception e) { - getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e}); - session.transfer(original, REL_FAILURE); - return; - } final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger(); @@ -151,6 +141,8 @@ public class SplitRecord extends AbstractProcessor { public void process(final InputStream in) throws IOException { try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + final RecordSchema schema = writerFactory.getSchema(original, reader.getSchema()); + final RecordSet recordSet = reader.createRecordSet(); final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index c6035f2..0443a11 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -16,6 +16,14 @@ */ package org.apache.nifi.processors.standard; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -38,15 +46,6 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - public class TestQueryRecord { static { @@ -261,7 +260,7 @@ public class TestQueryRecord { } @Override - public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException { final List<RecordField> recordFields = columnNames.stream() .map(name -> new RecordField(name, RecordFieldType.STRING.getDataType())) .collect(Collectors.toList()); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java index c819a6c..22b8805 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java @@ -53,11 +53,12 @@ public interface RecordSetWriterFactory extends ControllerService { * </p> * * @param flowFile the FlowFile from which the schema should be determined. - * @param content the contents of the FlowFile from which to determine the schema + * @param readSchema the schema that was read from the incoming FlowFile, or <code>null</code> if there is no input schema + * * @return the Schema that should be used for writing Records * @throws SchemaNotFoundException if unable to find the schema */ - RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException; + RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException; /** * <p> http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index 25a115d..88b657c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -89,7 +89,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { return new AvroReaderWithEmbeddedSchema(in); } else { - final RecordSchema recordSchema = getSchema(flowFile, in); + final RecordSchema recordSchema = getSchema(flowFile, in, null); final Schema avroSchema; try { http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 70da1e8..fd09961 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -65,7 +65,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement try { final Schema avroSchema; try { - if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { + if (recordSchema.getSchemaFormat().isPresent() && recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { final Optional<String> textOption = recordSchema.getSchemaText(); if (textOption.isPresent()) { avroSchema = compileAvroSchema(textOption.get()); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java index eba9429..d32e3e5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java @@ -36,7 +36,7 @@ public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy { private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>()); final Schema avroSchema = dataFileStream.getSchema(); final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index f2b1cbb..fa60b2a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -53,7 +53,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy { } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { if (this.context == null) { throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index dbea3dc..1528052 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -89,7 +89,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header. final BufferedInputStream bufferedIn = new BufferedInputStream(in); bufferedIn.mark(1024 * 1024); - final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn)); + final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn), null); bufferedIn.reset(); return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, dateFormat, timeFormat, timestampFormat); http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index dcf8b5a..fae3eba 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -222,7 +222,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { return recordSchema; } @@ -235,7 +235,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { - final RecordSchema schema = getSchema(flowFile, in); + final RecordSchema schema = getSchema(flowFile, in, null); return new GrokRecordReader(in, grok, schema, appendUnmatchedLine); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java index 2d11a9b..45cbbd1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java @@ -129,7 +129,7 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { - final RecordSchema schema = getSchema(flowFile, in); + final RecordSchema schema = getSchema(flowFile, in, null); return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index 1dd9834..063c9df 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -70,6 +70,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in), dateFormat, timeFormat, timestampFormat); + return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in, null), dateFormat, timeFormat, timestampFormat); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java index 7b60815..0acf6ff 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java @@ -17,6 +17,10 @@ package org.apache.nifi.serialization; +import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -34,11 +38,12 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter; import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter; +import org.apache.nifi.schema.access.NopSchemaAccessWriter; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNameAsAttribute; import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.schema.access.SchemaTextAsAttribute; +import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy; import org.apache.nifi.serialization.record.RecordSchema; public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryService { @@ -58,6 +63,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes", "The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if " + "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data."); + static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", "Do Not Write Schema", "Do not add any schema-related information to the FlowFile."); /** * This constant is just a base spec for the actual PropertyDescriptor. @@ -67,7 +73,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder() .name("Schema Write Strategy") .description("Specifies how the schema for a Record should be added to the data.") - .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA) + .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA) .defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue()) .required(true) .build(); @@ -76,7 +82,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic private volatile ConfigurationContext configurationContext; private volatile SchemaAccessWriter schemaAccessWriter; - private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)); + private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList( + SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)); + private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList( + SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY)); @Override @@ -98,6 +107,11 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic return SCHEMA_NAME_ATTRIBUTE; } + @Override + protected AllowableValue getDefaultSchemaAccessStrategy() { + return INHERIT_RECORD_SCHEMA; + } + protected PropertyDescriptor getSchemaWriteStrategyDescriptor() { return getPropertyDescriptor(SCHEMA_WRITE_STRATEGY.getName()); } @@ -121,7 +135,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic } protected List<AllowableValue> getSchemaWriteStrategyValues() { - return strategyList; + return schemaWriteStrategyList; + } + + @Override + protected List<AllowableValue> getSchemaAccessStrategyValues() { + return schemaAccessStrategyList; } protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) { @@ -132,11 +151,13 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) { return new SchemaNameAsAttribute(); } else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) { - return new SchemaTextAsAttribute(); + return new WriteAvroSchemaAttributeStrategy(); } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { return new HortonworksEncodedSchemaReferenceWriter(); } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { return new HortonworksAttributeSchemaReferenceWriter(); + } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) { + return new NopSchemaAccessWriter(); } return null; http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java index b2c683d..ddcfb0c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java @@ -32,6 +32,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -55,8 +56,10 @@ public abstract class SchemaRegistryService extends AbstractControllerService { private volatile ConfigurationContext configurationContext; private volatile SchemaAccessStrategy schemaAccessStrategy; + private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); - private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)); + private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList( + SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)); protected PropertyDescriptor getSchemaAcessStrategyDescriptor() { return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); @@ -95,6 +98,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService { this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, schemaRegistry, context); } + @Override protected ConfigurationContext getConfigurationContext() { return configurationContext; } @@ -103,13 +107,22 @@ public abstract class SchemaRegistryService extends AbstractControllerService { return schemaAccessStrategy; } - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + public final RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(); + if (accessStrategy == null) { + throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service"); + } + + return getSchemaAccessStrategy().getSchema(flowFile, contentStream, readSchema); + } + + public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(); if (accessStrategy == null) { throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service"); } - return getSchemaAccessStrategy().getSchema(flowFile, contentStream); + return getSchemaAccessStrategy().getSchema(flowFile, EMPTY_INPUT_STREAM, readSchema); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java index e6a9054..7057e43 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java @@ -18,11 +18,9 @@ package org.apache.nifi.text; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -38,7 +36,6 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.RecordSchema; @Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"}) @@ -46,8 +43,6 @@ import org.apache.nifi.serialization.record.RecordSchema; + "text is able to make use of the Expression Language to reference each of the fields that are available " + "in a Record. Each record in the RecordSet will be separated by a single newline character.") public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory { - private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(Collections.emptyList()); - static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder() .name("Text") .description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.") @@ -87,7 +82,7 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { - return EMPTY_SCHEMA; + public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return readSchema; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java index c25f0aa..33c0857 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java @@ -27,7 +27,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.schema.access.SchemaTextAsAttribute; +import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.junit.Assert; @@ -36,7 +36,7 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult { @Override protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { - return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute(), out); + return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/e7dcb6f6/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java index 20548db..e9de978 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java @@ -56,7 +56,7 @@ public class TestCSVHeaderSchemaStrategy { final RecordSchema schema; try (final InputStream bais = new ByteArrayInputStream(headerBytes)) { - schema = strategy.getSchema(null, bais); + schema = strategy.getSchema(null, bais, null); } final List<String> expectedFieldNames = Arrays.asList("a", "b", "c", "d", "e,z", "f");
