[ 
https://issues.apache.org/jira/browse/NIFI-5213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16482556#comment-16482556
 ] 

ASF GitHub Bot commented on NIFI-5213:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2718#discussion_r189606932
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 ---
    @@ -17,33 +17,61 @@
     
     package org.apache.nifi.avro;
     
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
     import java.io.EOFException;
     import java.io.IOException;
     import java.io.InputStream;
    +import java.io.SequenceInputStream;
     
     import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
     import org.apache.avro.generic.GenericDatumReader;
     import org.apache.avro.generic.GenericRecord;
     import org.apache.avro.io.BinaryDecoder;
     import org.apache.avro.io.DatumReader;
     import org.apache.avro.io.DecoderFactory;
    -import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.commons.io.input.TeeInputStream;
     import org.apache.nifi.serialization.MalformedRecordException;
     import org.apache.nifi.serialization.record.RecordSchema;
     
     public class AvroReaderWithExplicitSchema extends AvroRecordReader {
         private final InputStream in;
         private final RecordSchema recordSchema;
         private final DatumReader<GenericRecord> datumReader;
    -    private final BinaryDecoder decoder;
    +    private BinaryDecoder decoder;
         private GenericRecord genericRecord;
    +    private DataFileStream<GenericRecord> dataFileStream;
     
    -    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException, 
SchemaNotFoundException {
    +    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
             this.in = in;
             this.recordSchema = recordSchema;
     
    -        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
    -        decoder = DecoderFactory.get().binaryDecoder(in, null);
    +        datumReader = new GenericDatumReader<>(avroSchema);
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
    +        // Try to parse as a DataFileStream, if it works, glue the streams 
back together and delegate calls to the DataFileStream
    +        try {
    +            dataFileStream = new DataFileStream<>(teeInputStream, new 
GenericDatumReader<>());
    +        } catch (IOException ioe) {
    +            // Carry on, hopefully a raw Avro file
    +            // Need to be able to re-read the bytes read so far, and the 
InputStream passed in doesn't support reset. Use the TeeInputStream in
    +            // conjunction with SequenceInputStream to glue the two 
streams back together for future reading
    +            ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
    +            SequenceInputStream sis = new SequenceInputStream(bais, in);
    +            decoder = DecoderFactory.get().binaryDecoder(sis, null);
    +        }
    +        if (dataFileStream != null) {
    +            // Verify the schemas are the same
    +            Schema embeddedSchema = dataFileStream.getSchema();
    +            if (!embeddedSchema.equals(avroSchema)) {
    +                throw new IOException("Explicit schema does not match 
embedded schema");
    --- End diff --
    
    It could be, if SchemaValidationException extended IOException instead of 
RuntimeException. The processors that end up creating this reader handle 
IOException but often handle a runtime exception with rollback (I couldn't find 
anything that explicitly catches SchemaValidationException. Also, although 
Javadoc is missing for that class, in RecordReader there is Javadoc for a 
parameter that says:
    
    `throws SchemaValidationException if a Record contains a field that 
violates the schema and cannot be coerced into the appropriate field type.`
    
    That's not exactly what's going on here, so I thought to keep it an 
IOException which may be handled by routing to failure rather than a rollback 
(which would prevent a flow from continuing if an Avro file showed up with an 
embedded that didn't exactly match the explicit one. It's still best practice 
to Use Embedded Schema instead of an explicit one, this was just supposed to 
make it a little easier on the user if they configured it differently.


> Allow AvroReader with explicit schema to read files with embedded schema
> ------------------------------------------------------------------------
>
>                 Key: NIFI-5213
>                 URL: https://issues.apache.org/jira/browse/NIFI-5213
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>            Priority: Minor
>
> AvroReader allows the choice of schema access strategy from such options as 
> Use Embedded Schema, Use Schema Name, Use Schema Text, etc. If the incoming 
> Avro files will have embedded schemas, then Use Embedded Schema is best 
> practice for the Avro Reader. However it is not intuitive that if the same 
> schema that is embedded in the file is specified by name (using a schema 
> registry) or explicitly via Schema Text, that errors can occur. This has been 
> noticed in QueryRecord for example, and the error is also not intuitive or 
> descriptive (it is often an ArrayIndexOutOfBoundsException).
> To provide a better user experience, it would be an improvement for 
> AvroReader to be able to successfully process Avro files with embedded 
> schemas, even when the Schema Access Strategy is not "Use Embedded Schema". 
> Of course, the explicit schema would have to match the embedded schema, or an 
> error would be reported (and rightfully so).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to