Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2718#discussion_r189606932
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
---
@@ -17,33 +17,61 @@
package org.apache.nifi.avro;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.SequenceInputStream;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.RecordSchema;
public class AvroReaderWithExplicitSchema extends AvroRecordReader {
private final InputStream in;
private final RecordSchema recordSchema;
private final DatumReader<GenericRecord> datumReader;
- private final BinaryDecoder decoder;
+ private BinaryDecoder decoder;
private GenericRecord genericRecord;
+ private DataFileStream<GenericRecord> dataFileStream;
- public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException,
SchemaNotFoundException {
+ public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
this.in = in;
this.recordSchema = recordSchema;
- datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
- decoder = DecoderFactory.get().binaryDecoder(in, null);
+ datumReader = new GenericDatumReader<>(avroSchema);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+ // Try to parse as a DataFileStream, if it works, glue the streams
back together and delegate calls to the DataFileStream
+ try {
+ dataFileStream = new DataFileStream<>(teeInputStream, new
GenericDatumReader<>());
+ } catch (IOException ioe) {
+ // Carry on, hopefully a raw Avro file
+ // Need to be able to re-read the bytes read so far, and the
InputStream passed in doesn't support reset. Use the TeeInputStream in
+ // conjunction with SequenceInputStream to glue the two
streams back together for future reading
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ SequenceInputStream sis = new SequenceInputStream(bais, in);
+ decoder = DecoderFactory.get().binaryDecoder(sis, null);
+ }
+ if (dataFileStream != null) {
+ // Verify the schemas are the same
+ Schema embeddedSchema = dataFileStream.getSchema();
+ if (!embeddedSchema.equals(avroSchema)) {
+ throw new IOException("Explicit schema does not match
embedded schema");
--- End diff --
It could be, if SchemaValidationException extended IOException instead of
RuntimeException. The processors that end up creating this reader handle
IOException but often handle a runtime exception with rollback (I couldn't find
anything that explicitly catches SchemaValidationException. Also, although
Javadoc is missing for that class, in RecordReader there is Javadoc for a
parameter that says:
`throws SchemaValidationException if a Record contains a field that
violates the schema and cannot be coerced into the appropriate field type.`
That's not exactly what's going on here, so I thought to keep it an
IOException which may be handled by routing to failure rather than a rollback
(which would prevent a flow from continuing if an Avro file showed up with an
embedded that didn't exactly match the explicit one. It's still best practice
to Use Embedded Schema instead of an explicit one, this was just supposed to
make it a little easier on the user if they configured it differently.
---