AaronLeon commented on a change in pull request #2718: NIFI-5213: Allow
AvroReader to process files w embedded schema even when the access strategy is
explicit schema
URL: https://github.com/apache/nifi/pull/2718#discussion_r261772316
##########
File path:
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
##########
@@ -17,33 +17,61 @@
package org.apache.nifi.avro;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.SequenceInputStream;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.RecordSchema;
public class AvroReaderWithExplicitSchema extends AvroRecordReader {
private final InputStream in;
private final RecordSchema recordSchema;
private final DatumReader<GenericRecord> datumReader;
- private final BinaryDecoder decoder;
+ private BinaryDecoder decoder;
private GenericRecord genericRecord;
+ private DataFileStream<GenericRecord> dataFileStream;
- public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException,
SchemaNotFoundException {
+ public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
this.in = in;
this.recordSchema = recordSchema;
- datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
- decoder = DecoderFactory.get().binaryDecoder(in, null);
+ datumReader = new GenericDatumReader<>(avroSchema);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+ // Try to parse as a DataFileStream, if it works, glue the streams
back together and delegate calls to the DataFileStream
Review comment:
It might be of interest to reverse the try/catch block. I understand the
intuition to determine first whether the schema is embedded since that's the
order in which the bytes are encoded, but there is an embedded Avro schema
strategy for a reason.
From my understanding, a user who chooses to use
AvroReaderWithExplicitSchema over AvroReaderWithEmbeddedSchema is expecting to
use an Explicit Schema to decode Avro data without an embedded schema and the
exceptional case is that the data contains an embedded schema. The current
implementation forces that user's Flow to propagate an exception each time,
which may be expensive. Maybe it's negligible with sufficient batching of
records, but just a nit-picking thought. The TeeInputStream construct is pretty
neat though :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services