ASF GitHub Bot commented on NIFI-5213:

Github user mattyb149 commented on a diff in the pull request:

    --- Diff: 
    @@ -17,33 +17,61 @@
     package org.apache.nifi.avro;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
     import java.io.EOFException;
     import java.io.IOException;
     import java.io.InputStream;
    +import java.io.SequenceInputStream;
     import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
     import org.apache.avro.generic.GenericDatumReader;
     import org.apache.avro.generic.GenericRecord;
     import org.apache.avro.io.BinaryDecoder;
     import org.apache.avro.io.DatumReader;
     import org.apache.avro.io.DecoderFactory;
    -import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.commons.io.input.TeeInputStream;
     import org.apache.nifi.serialization.MalformedRecordException;
     import org.apache.nifi.serialization.record.RecordSchema;
     public class AvroReaderWithExplicitSchema extends AvroRecordReader {
         private final InputStream in;
         private final RecordSchema recordSchema;
         private final DatumReader<GenericRecord> datumReader;
    -    private final BinaryDecoder decoder;
    +    private BinaryDecoder decoder;
         private GenericRecord genericRecord;
    +    private DataFileStream<GenericRecord> dataFileStream;
    -    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException, 
SchemaNotFoundException {
    +    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
             this.in = in;
             this.recordSchema = recordSchema;
    -        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
    -        decoder = DecoderFactory.get().binaryDecoder(in, null);
    +        datumReader = new GenericDatumReader<>(avroSchema);
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
    +        // Try to parse as a DataFileStream, if it works, glue the streams 
back together and delegate calls to the DataFileStream
    +        try {
    +            dataFileStream = new DataFileStream<>(teeInputStream, new 
    --- End diff --
    I checked this, it's a bit messy but I wasn't sure how else to do it. 
`decoder` is only used when parsing a "raw" Avro file, one without an embedded 
schema. For these changes, there are two scenarios:
    1) The input is a raw Avro file. Then line 55 will throw an IOException and 
`decoder` will be set as it used to be (but this time in the catch). Then in 
nextAvroRecord it will drop through the `dataFileStream != null` check and use 
decoder (which is not null).
    2) The input is an Avro file with an embedded schema. Then `dataFileStream` 
will not be null and nextAvroRecord() will return at line 86, before `decoder` 
is used.

> Allow AvroReader with explicit schema to read files with embedded schema
> ------------------------------------------------------------------------
>                 Key: NIFI-5213
>                 URL: https://issues.apache.org/jira/browse/NIFI-5213
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>            Priority: Minor
> AvroReader allows the choice of schema access strategy from such options as 
> Use Embedded Schema, Use Schema Name, Use Schema Text, etc. If the incoming 
> Avro files will have embedded schemas, then Use Embedded Schema is best 
> practice for the Avro Reader. However it is not intuitive that if the same 
> schema that is embedded in the file is specified by name (using a schema 
> registry) or explicitly via Schema Text, that errors can occur. This has been 
> noticed in QueryRecord for example, and the error is also not intuitive or 
> descriptive (it is often an ArrayIndexOutOfBoundsException).
> To provide a better user experience, it would be an improvement for 
> AvroReader to be able to successfully process Avro files with embedded 
> schemas, even when the Schema Access Strategy is not "Use Embedded Schema". 
> Of course, the explicit schema would have to match the embedded schema, or an 
> error would be reported (and rightfully so).

This message was sent by Atlassian JIRA

Reply via email to