[
https://issues.apache.org/jira/browse/NIFI-5213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510191#comment-16510191
]
ASF GitHub Bot commented on NIFI-5213:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2718#discussion_r194885270
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
---
@@ -17,33 +17,61 @@
package org.apache.nifi.avro;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.SequenceInputStream;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.RecordSchema;
public class AvroReaderWithExplicitSchema extends AvroRecordReader {
private final InputStream in;
private final RecordSchema recordSchema;
private final DatumReader<GenericRecord> datumReader;
- private final BinaryDecoder decoder;
+ private BinaryDecoder decoder;
private GenericRecord genericRecord;
+ private DataFileStream<GenericRecord> dataFileStream;
- public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException,
SchemaNotFoundException {
+ public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
this.in = in;
this.recordSchema = recordSchema;
- datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
- decoder = DecoderFactory.get().binaryDecoder(in, null);
+ datumReader = new GenericDatumReader<>(avroSchema);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+ // Try to parse as a DataFileStream, if it works, glue the streams
back together and delegate calls to the DataFileStream
+ try {
+ dataFileStream = new DataFileStream<>(teeInputStream, new
GenericDatumReader<>());
--- End diff --
I checked this, it's a bit messy but I wasn't sure how else to do it.
`decoder` is only used when parsing a "raw" Avro file, one without an embedded
schema. For these changes, there are two scenarios:
1) The input is a raw Avro file. Then line 55 will throw an IOException and
`decoder` will be set as it used to be (but this time in the catch). Then in
nextAvroRecord it will drop through the `dataFileStream != null` check and use
decoder (which is not null).
2) The input is an Avro file with an embedded schema. Then `dataFileStream`
will not be null and nextAvroRecord() will return at line 86, before `decoder`
is used.
> Allow AvroReader with explicit schema to read files with embedded schema
> ------------------------------------------------------------------------
>
> Key: NIFI-5213
> URL: https://issues.apache.org/jira/browse/NIFI-5213
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Priority: Minor
>
> AvroReader allows the choice of schema access strategy from such options as
> Use Embedded Schema, Use Schema Name, Use Schema Text, etc. If the incoming
> Avro files will have embedded schemas, then Use Embedded Schema is best
> practice for the Avro Reader. However it is not intuitive that if the same
> schema that is embedded in the file is specified by name (using a schema
> registry) or explicitly via Schema Text, that errors can occur. This has been
> noticed in QueryRecord for example, and the error is also not intuitive or
> descriptive (it is often an ArrayIndexOutOfBoundsException).
> To provide a better user experience, it would be an improvement for
> AvroReader to be able to successfully process Avro files with embedded
> schemas, even when the Schema Access Strategy is not "Use Embedded Schema".
> Of course, the explicit schema would have to match the embedded schema, or an
> error would be reported (and rightfully so).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)