Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2718#discussion_r194590103 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java --- @@ -17,33 +17,61 @@ package org.apache.nifi.avro; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.commons.io.input.TeeInputStream; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final InputStream in; private final RecordSchema recordSchema; private final DatumReader<GenericRecord> datumReader; - private final BinaryDecoder decoder; + private BinaryDecoder decoder; private GenericRecord genericRecord; + private DataFileStream<GenericRecord> dataFileStream; - public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException { + public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException { this.in = in; this.recordSchema = recordSchema; - datumReader = new GenericDatumReader<GenericRecord>(avroSchema); - decoder = DecoderFactory.get().binaryDecoder(in, null); + datumReader = new GenericDatumReader<>(avroSchema); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + TeeInputStream teeInputStream = new TeeInputStream(in, baos); + // Try to parse as a DataFileStream, if it works, glue the streams back together and delegate calls to the DataFileStream + try { + dataFileStream = new DataFileStream<>(teeInputStream, new GenericDatumReader<>()); + } catch (IOException ioe) { + // Carry on, hopefully a raw Avro file + // Need to be able to re-read the bytes read so far, and the InputStream passed in doesn't support reset. Use the TeeInputStream in + // conjunction with SequenceInputStream to glue the two streams back together for future reading + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + SequenceInputStream sis = new SequenceInputStream(bais, in); + decoder = DecoderFactory.get().binaryDecoder(sis, null); + } + if (dataFileStream != null) { + // Verify the schemas are the same + Schema embeddedSchema = dataFileStream.getSchema(); + if (!embeddedSchema.equals(avroSchema)) { + throw new IOException("Explicit schema does not match embedded schema"); --- End diff -- I thought schema evolution was supported in other ways such as including optional (possibly missing) fields to support a transition to/from additional/deleted fields, but I admit I don't have my mind wrapped around the whole thing. In this case it was driven by the Avro API, if the file has a schema, there is a much more fluent API to read the records than if it does not. That is not for the case when someone wants to impose a schema on a file that already has a schema; I'm not sure that's a case for schema evolution (i.e. the embedded schema is not correct?), the alternate API is for "raw" Avro files that don't have an embedded schema, and instead need an external one for processing. TBH I don't know how to parse an Avro file that has an embedded schema with their API by imposing an external one. This was the middle ground to allow it as long as the external schema matched the embedded one. Thoughts?
---