This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f040c6a  NIFI-5213: Allow AvroReader to process files w embedded 
schema even when the access strategy is explicit schema NIFI-5213: Incorporated 
review comments
f040c6a is described below

commit f040c6aadbedb0fb5e778e7e13069d66cc97af45
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri May 18 00:29:52 2018 -0400

    NIFI-5213: Allow AvroReader to process files w embedded schema even when 
the access strategy is explicit schema
    NIFI-5213: Incorporated review comments
    
    This closes #2718
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../nifi-record-serialization-services/pom.xml     |   2 +
 .../nifi/avro/AvroReaderWithExplicitSchema.java    |  48 ++++++++--
 .../avro/TestAvroReaderWithExplicitSchema.java     |  99 +++++++++++++++++++++
 .../src/test/resources/avro/avro_embed_schema.avro | Bin 0 -> 216 bytes
 .../src/test/resources/avro/avro_schemaless.avro   | Bin 0 -> 18 bytes
 .../src/test/resources/avro/avro_schemaless.avsc   |  25 ++++++
 6 files changed, 167 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index f17fe3c..4c4ba4c 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -132,6 +132,8 @@
                 <artifactId>apache-rat-plugin</artifactId>
                 <configuration>
                     <excludes combine.children="append">
+                        
<exclude>src/test/resources/avro/avro_embed_schema.avro</exclude>
+                        
<exclude>src/test/resources/avro/avro_schemaless.avro</exclude>
                         
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
                         
<exclude>src/test/resources/avro/decimals.avsc</exclude>
                         
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
index 9197e47..9c0dbf5 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
@@ -17,30 +17,59 @@
 
 package org.apache.nifi.avro;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
 public class AvroReaderWithExplicitSchema extends AvroRecordReader {
     private final InputStream in;
     private final RecordSchema recordSchema;
     private final DatumReader<GenericRecord> datumReader;
-    private final BinaryDecoder decoder;
+    private BinaryDecoder decoder;
     private GenericRecord genericRecord;
+    private DataFileStream<GenericRecord> dataFileStream;
 
-    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) {
+    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
         this.in = in;
         this.recordSchema = recordSchema;
 
         datumReader = new NonCachingDatumReader<>(avroSchema);
-        decoder = DecoderFactory.get().binaryDecoder(in, null);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+        // Try to parse as a DataFileStream, if it works, glue the streams 
back together and delegate calls to the DataFileStream
+        try {
+            dataFileStream = new DataFileStream<>(teeInputStream, new 
NonCachingDatumReader<>());
+        } catch (IOException ioe) {
+            // Carry on, hopefully a raw Avro file
+            // Need to be able to re-read the bytes read so far, and the 
InputStream passed in doesn't support reset. Use the TeeInputStream in
+            // conjunction with SequenceInputStream to glue the two streams 
back together for future reading
+            ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+            SequenceInputStream sis = new SequenceInputStream(bais, in);
+            decoder = DecoderFactory.get().binaryDecoder(sis, null);
+        }
+        if (dataFileStream != null) {
+            // Verify the schemas are the same
+            Schema embeddedSchema = dataFileStream.getSchema();
+            if (!embeddedSchema.equals(avroSchema)) {
+                throw new IOException("Explicit schema does not match embedded 
schema");
+            }
+            // Need to be able to re-read the bytes read so far, but we don't 
want to copy the input to a byte array anymore, so get rid of the TeeInputStream
+            ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+            SequenceInputStream sis = new SequenceInputStream(bais, in);
+            dataFileStream = new DataFileStream<>(sis, new 
NonCachingDatumReader<>());
+        }
     }
 
     @Override
@@ -50,6 +79,11 @@ public class AvroReaderWithExplicitSchema extends 
AvroRecordReader {
 
     @Override
     protected GenericRecord nextAvroRecord() throws IOException {
+        // If the avro file had an embedded schema that matched the explicit 
schema, delegate to the DataFileStream for reading records
+        if (dataFileStream != null) {
+            return dataFileStream.hasNext() ? dataFileStream.next() : null;
+        }
+
         if (decoder.isEnd()) {
             return null;
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
new file mode 100644
index 0000000..5d3cd00
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestAvroReaderWithExplicitSchema {
+
+    @Test
+    public void testAvroExplicitReaderWithSchemalessFile() throws Exception {
+        File avroFileWithEmbeddedSchema = new 
File("src/test/resources/avro/avro_schemaless.avro");
+        FileInputStream fileInputStream = new 
FileInputStream(avroFileWithEmbeddedSchema);
+        Schema dataSchema = new Schema.Parser().parse(new 
File("src/test/resources/avro/avro_schemaless.avsc"));
+        RecordSchema recordSchema = new 
SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, 
null);
+
+        AvroReaderWithExplicitSchema avroReader = new 
AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
+        GenericRecord record = avroReader.nextAvroRecord();
+        assertNotNull(record);
+        assertEquals(1, record.get("id"));
+        assertNotNull(record.get("key"));
+        assertEquals("value", record.get("key").toString());
+
+        record = avroReader.nextAvroRecord();
+        assertNotNull(record);
+        assertEquals(2, record.get("id"));
+        assertNull(record.get("key"));
+
+        record = avroReader.nextAvroRecord();
+        assertEquals(3, record.get("id"));
+        assertNotNull(record.get("key"));
+        assertEquals("hello", record.get("key").toString());
+        record = avroReader.nextAvroRecord();
+        assertNull(record);
+    }
+
+    @Test
+    public void testAvroExplicitReaderWithEmbeddedSchemaFile() throws 
Exception {
+        File avroFileWithEmbeddedSchema = new 
File("src/test/resources/avro/avro_embed_schema.avro");
+        FileInputStream fileInputStream = new 
FileInputStream(avroFileWithEmbeddedSchema);
+        Schema dataSchema = new Schema.Parser().parse(new 
File("src/test/resources/avro/avro_schemaless.avsc"));
+        RecordSchema recordSchema = new 
SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, 
null);
+
+        AvroReaderWithExplicitSchema avroReader = new 
AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
+        GenericRecord record = avroReader.nextAvroRecord();
+        assertNotNull(record);
+        assertEquals(1, record.get("id"));
+        assertNotNull(record.get("key"));
+        assertEquals("value", record.get("key").toString());
+
+        record = avroReader.nextAvroRecord();
+        assertNotNull(record);
+        assertEquals(2, record.get("id"));
+        assertNull(record.get("key"));
+
+        record = avroReader.nextAvroRecord();
+        assertEquals(3, record.get("id"));
+        assertNotNull(record.get("key"));
+        assertEquals("hello", record.get("key").toString());
+        record = avroReader.nextAvroRecord();
+        assertNull(record);
+    }
+
+    @Test(expected = IOException.class)
+    public void 
testAvroExplicitReaderWithEmbeddedSchemaFileDifferentFromExplicitSchema() 
throws Exception {
+        File avroFileWithEmbeddedSchema = new 
File("src/test/resources/avro/avro_embed_schema.avro");
+        FileInputStream fileInputStream = new 
FileInputStream(avroFileWithEmbeddedSchema);
+        Schema dataSchema = new Schema.Parser().parse("{\"namespace\": 
\"nifi\",\"name\": \"test\",\"type\": \"record\",\"fields\": [{\"name\": 
\"id\",\"type\": \"int\"}]}");
+        RecordSchema recordSchema = new 
SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, 
null);
+
+        // Causes IOException in constructor due to schemas not matching
+        new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, 
dataSchema);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_embed_schema.avro
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_embed_schema.avro
new file mode 100644
index 0000000..b2b0875
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_embed_schema.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avro
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avro
new file mode 100644
index 0000000..d488a9c
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avsc
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avsc
new file mode 100644
index 0000000..dd84de4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "nifi",
+ "name": "test",
+ "type": "record",
+ "fields": [
+  {"name": "id","type": "int"},
+  {"name": "key","type": ["null", "string"]}
+ ]
+}
\ No newline at end of file

Reply via email to