Github user bdesert commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2718#discussion_r189422180
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
---
@@ -17,33 +17,61 @@
package org.apache.nifi.avro;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.SequenceInputStream;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.RecordSchema;
public class AvroReaderWithExplicitSchema extends AvroRecordReader {
private final InputStream in;
private final RecordSchema recordSchema;
private final DatumReader<GenericRecord> datumReader;
- private final BinaryDecoder decoder;
+ private BinaryDecoder decoder;
private GenericRecord genericRecord;
+ private DataFileStream<GenericRecord> dataFileStream;
- public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException,
SchemaNotFoundException {
+ public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
this.in = in;
this.recordSchema = recordSchema;
- datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
- decoder = DecoderFactory.get().binaryDecoder(in, null);
+ datumReader = new GenericDatumReader<>(avroSchema);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+ // Try to parse as a DataFileStream, if it works, glue the streams
back together and delegate calls to the DataFileStream
+ try {
+ dataFileStream = new DataFileStream<>(teeInputStream, new
GenericDatumReader<>());
+ } catch (IOException ioe) {
+ // Carry on, hopefully a raw Avro file
+ // Need to be able to re-read the bytes read so far, and the
InputStream passed in doesn't support reset. Use the TeeInputStream in
+ // conjunction with SequenceInputStream to glue the two
streams back together for future reading
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ SequenceInputStream sis = new SequenceInputStream(bais, in);
+ decoder = DecoderFactory.get().binaryDecoder(sis, null);
+ }
+ if (dataFileStream != null) {
+ // Verify the schemas are the same
+ Schema embeddedSchema = dataFileStream.getSchema();
+ if (!embeddedSchema.equals(avroSchema)) {
+ throw new IOException("Explicit schema does not match
embedded schema");
--- End diff --
Would it be better to throw SchemaValidationException - makes more sense
than IOException?
---