Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2718#discussion_r194572498 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java --- @@ -17,33 +17,61 @@ package org.apache.nifi.avro; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.commons.io.input.TeeInputStream; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final InputStream in; private final RecordSchema recordSchema; private final DatumReader<GenericRecord> datumReader; - private final BinaryDecoder decoder; + private BinaryDecoder decoder; private GenericRecord genericRecord; + private DataFileStream<GenericRecord> dataFileStream; - public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException { + public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException { this.in = in; this.recordSchema = recordSchema; - datumReader = new GenericDatumReader<GenericRecord>(avroSchema); - decoder = DecoderFactory.get().binaryDecoder(in, null); + datumReader = new GenericDatumReader<>(avroSchema); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + TeeInputStream teeInputStream = new TeeInputStream(in, baos); + // Try to parse as a DataFileStream, if it works, glue the streams back together and delegate calls to the DataFileStream + try { + dataFileStream = new DataFileStream<>(teeInputStream, new GenericDatumReader<>()); + } catch (IOException ioe) { + // Carry on, hopefully a raw Avro file + // Need to be able to re-read the bytes read so far, and the InputStream passed in doesn't support reset. Use the TeeInputStream in + // conjunction with SequenceInputStream to glue the two streams back together for future reading + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + SequenceInputStream sis = new SequenceInputStream(bais, in); + decoder = DecoderFactory.get().binaryDecoder(sis, null); + } + if (dataFileStream != null) { + // Verify the schemas are the same + Schema embeddedSchema = dataFileStream.getSchema(); + if (!embeddedSchema.equals(avroSchema)) { + throw new IOException("Explicit schema does not match embedded schema"); --- End diff -- @mattyb149 How does it handle schema evolution in this case? It's possible that the Kafka producer has `Corporate Schema v1` and NiFi is configured with `Corporate Schema v2` and v2 gracefully allows an upgrade from v1 via Avro schema evolution rules. Or am I missing something about that being not really a thing WRT the Record API?
---