mattyb149 commented on a change in pull request #2718: NIFI-5213: Allow 
AvroReader to process files w embedded schema even when the access strategy is 
explicit schema
URL: https://github.com/apache/nifi/pull/2718#discussion_r316213795
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 ##########
 @@ -17,33 +17,61 @@
 
 package org.apache.nifi.avro;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.SequenceInputStream;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 public class AvroReaderWithExplicitSchema extends AvroRecordReader {
     private final InputStream in;
     private final RecordSchema recordSchema;
     private final DatumReader<GenericRecord> datumReader;
-    private final BinaryDecoder decoder;
+    private BinaryDecoder decoder;
     private GenericRecord genericRecord;
+    private DataFileStream<GenericRecord> dataFileStream;
 
-    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException, 
SchemaNotFoundException {
+    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
         this.in = in;
         this.recordSchema = recordSchema;
 
-        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
-        decoder = DecoderFactory.get().binaryDecoder(in, null);
+        datumReader = new GenericDatumReader<>(avroSchema);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+        // Try to parse as a DataFileStream, if it works, glue the streams 
back together and delegate calls to the DataFileStream
 
 Review comment:
   That's a fair point to reverse the try/catch, and I did try that. The 
problem was that no error occurs on the raw file until you actually read from 
it, which happens outside the constructor during a record read loop. Here the 
exception will occur when the AvroReader is configured correctly, but it only 
happens once per flow file so the expense is amortized over the size of the 
flow file as you pointed out. Also, at least in my experience users are more 
likely to be using an embedded schema and trying to also explicitly define it 
than they are to be using schemaless files. Doesn't make a lot of sense (and 
maybe it's just my perception/experience) but I wrote the Jira because of how 
often people ran into this issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to