This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f040c6a NIFI-5213: Allow AvroReader to process files w embedded
schema even when the access strategy is explicit schema NIFI-5213: Incorporated
review comments
f040c6a is described below
commit f040c6aadbedb0fb5e778e7e13069d66cc97af45
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri May 18 00:29:52 2018 -0400
NIFI-5213: Allow AvroReader to process files w embedded schema even when
the access strategy is explicit schema
NIFI-5213: Incorporated review comments
This closes #2718
Signed-off-by: Mike Thomsen <[email protected]>
---
.../nifi-record-serialization-services/pom.xml | 2 +
.../nifi/avro/AvroReaderWithExplicitSchema.java | 48 ++++++++--
.../avro/TestAvroReaderWithExplicitSchema.java | 99 +++++++++++++++++++++
.../src/test/resources/avro/avro_embed_schema.avro | Bin 0 -> 216 bytes
.../src/test/resources/avro/avro_schemaless.avro | Bin 0 -> 18 bytes
.../src/test/resources/avro/avro_schemaless.avsc | 25 ++++++
6 files changed, 167 insertions(+), 7 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index f17fe3c..4c4ba4c 100755
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -132,6 +132,8 @@
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
+
<exclude>src/test/resources/avro/avro_embed_schema.avro</exclude>
+
<exclude>src/test/resources/avro/avro_schemaless.avro</exclude>
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
<exclude>src/test/resources/avro/decimals.avsc</exclude>
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
index 9197e47..9c0dbf5 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
@@ -17,30 +17,59 @@
package org.apache.nifi.avro;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.nifi.serialization.record.RecordSchema;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
public class AvroReaderWithExplicitSchema extends AvroRecordReader {
private final InputStream in;
private final RecordSchema recordSchema;
private final DatumReader<GenericRecord> datumReader;
- private final BinaryDecoder decoder;
+ private BinaryDecoder decoder;
private GenericRecord genericRecord;
+ private DataFileStream<GenericRecord> dataFileStream;
- public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) {
+ public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
this.in = in;
this.recordSchema = recordSchema;
datumReader = new NonCachingDatumReader<>(avroSchema);
- decoder = DecoderFactory.get().binaryDecoder(in, null);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+ // Try to parse as a DataFileStream, if it works, glue the streams
back together and delegate calls to the DataFileStream
+ try {
+ dataFileStream = new DataFileStream<>(teeInputStream, new
NonCachingDatumReader<>());
+ } catch (IOException ioe) {
+ // Carry on, hopefully a raw Avro file
+ // Need to be able to re-read the bytes read so far, and the
InputStream passed in doesn't support reset. Use the TeeInputStream in
+ // conjunction with SequenceInputStream to glue the two streams
back together for future reading
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ SequenceInputStream sis = new SequenceInputStream(bais, in);
+ decoder = DecoderFactory.get().binaryDecoder(sis, null);
+ }
+ if (dataFileStream != null) {
+ // Verify the schemas are the same
+ Schema embeddedSchema = dataFileStream.getSchema();
+ if (!embeddedSchema.equals(avroSchema)) {
+ throw new IOException("Explicit schema does not match embedded
schema");
+ }
+ // Need to be able to re-read the bytes read so far, but we don't
want to copy the input to a byte array anymore, so get rid of the TeeInputStream
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ SequenceInputStream sis = new SequenceInputStream(bais, in);
+ dataFileStream = new DataFileStream<>(sis, new
NonCachingDatumReader<>());
+ }
}
@Override
@@ -50,6 +79,11 @@ public class AvroReaderWithExplicitSchema extends
AvroRecordReader {
@Override
protected GenericRecord nextAvroRecord() throws IOException {
+ // If the avro file had an embedded schema that matched the explicit
schema, delegate to the DataFileStream for reading records
+ if (dataFileStream != null) {
+ return dataFileStream.hasNext() ? dataFileStream.next() : null;
+ }
+
if (decoder.isEnd()) {
return null;
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
new file mode 100644
index 0000000..5d3cd00
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestAvroReaderWithExplicitSchema {
+
+ @Test
+ public void testAvroExplicitReaderWithSchemalessFile() throws Exception {
+ File avroFileWithEmbeddedSchema = new
File("src/test/resources/avro/avro_schemaless.avro");
+ FileInputStream fileInputStream = new
FileInputStream(avroFileWithEmbeddedSchema);
+ Schema dataSchema = new Schema.Parser().parse(new
File("src/test/resources/avro/avro_schemaless.avsc"));
+ RecordSchema recordSchema = new
SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT,
null);
+
+ AvroReaderWithExplicitSchema avroReader = new
AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
+ GenericRecord record = avroReader.nextAvroRecord();
+ assertNotNull(record);
+ assertEquals(1, record.get("id"));
+ assertNotNull(record.get("key"));
+ assertEquals("value", record.get("key").toString());
+
+ record = avroReader.nextAvroRecord();
+ assertNotNull(record);
+ assertEquals(2, record.get("id"));
+ assertNull(record.get("key"));
+
+ record = avroReader.nextAvroRecord();
+ assertEquals(3, record.get("id"));
+ assertNotNull(record.get("key"));
+ assertEquals("hello", record.get("key").toString());
+ record = avroReader.nextAvroRecord();
+ assertNull(record);
+ }
+
+ @Test
+ public void testAvroExplicitReaderWithEmbeddedSchemaFile() throws
Exception {
+ File avroFileWithEmbeddedSchema = new
File("src/test/resources/avro/avro_embed_schema.avro");
+ FileInputStream fileInputStream = new
FileInputStream(avroFileWithEmbeddedSchema);
+ Schema dataSchema = new Schema.Parser().parse(new
File("src/test/resources/avro/avro_schemaless.avsc"));
+ RecordSchema recordSchema = new
SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT,
null);
+
+ AvroReaderWithExplicitSchema avroReader = new
AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
+ GenericRecord record = avroReader.nextAvroRecord();
+ assertNotNull(record);
+ assertEquals(1, record.get("id"));
+ assertNotNull(record.get("key"));
+ assertEquals("value", record.get("key").toString());
+
+ record = avroReader.nextAvroRecord();
+ assertNotNull(record);
+ assertEquals(2, record.get("id"));
+ assertNull(record.get("key"));
+
+ record = avroReader.nextAvroRecord();
+ assertEquals(3, record.get("id"));
+ assertNotNull(record.get("key"));
+ assertEquals("hello", record.get("key").toString());
+ record = avroReader.nextAvroRecord();
+ assertNull(record);
+ }
+
+ @Test(expected = IOException.class)
+ public void
testAvroExplicitReaderWithEmbeddedSchemaFileDifferentFromExplicitSchema()
throws Exception {
+ File avroFileWithEmbeddedSchema = new
File("src/test/resources/avro/avro_embed_schema.avro");
+ FileInputStream fileInputStream = new
FileInputStream(avroFileWithEmbeddedSchema);
+ Schema dataSchema = new Schema.Parser().parse("{\"namespace\":
\"nifi\",\"name\": \"test\",\"type\": \"record\",\"fields\": [{\"name\":
\"id\",\"type\": \"int\"}]}");
+ RecordSchema recordSchema = new
SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT,
null);
+
+ // Causes IOException in constructor due to schemas not matching
+ new AvroReaderWithExplicitSchema(fileInputStream, recordSchema,
dataSchema);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_embed_schema.avro
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_embed_schema.avro
new file mode 100644
index 0000000..b2b0875
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_embed_schema.avro
differ
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avro
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avro
new file mode 100644
index 0000000..d488a9c
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avro
differ
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avsc
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avsc
new file mode 100644
index 0000000..dd84de4
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/avro_schemaless.avsc
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "nifi",
+ "name": "test",
+ "type": "record",
+ "fields": [
+ {"name": "id","type": "int"},
+ {"name": "key","type": ["null", "string"]}
+ ]
+}
\ No newline at end of file