This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new ba2e24b68f NIFI-12745: Fix AvroReader silently dropping malformed
records
ba2e24b68f is described below
commit ba2e24b68f036363f333e216ee968d999a59e268
Author: Rajmund Takacs <[email protected]>
AuthorDate: Tue Feb 6 16:03:56 2024 +0100
NIFI-12745: Fix AvroReader silently dropping malformed records
This closes #8361.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../nifi-record-serialization-services/pom.xml | 2 +
.../nifi/avro/AvroReaderWithExplicitSchema.java | 2 +-
.../avro/TestAvroReaderWithExplicitSchema.java | 54 ++++++++++++++++-----
.../resources/avro/schemaless_simple_record.avro | Bin 0 -> 9 bytes
.../resources/avro/schemaless_simple_record.avsc | 28 +++++++++++
.../avro/schemaless_simple_record_extra_field.avsc | 24 +++++++++
6 files changed, 96 insertions(+), 14 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index ebc8fd2f00..5e84a59faf 100755
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -169,6 +169,8 @@
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
<exclude>src/test/resources/avro/simple.avsc</exclude>
<exclude>src/test/resources/avro/recursive.avsc</exclude>
+
<exclude>src/test/resources/avro/schemaless_simple_record.avsc</exclude>
+
<exclude>src/test/resources/avro/schemaless_simple_record_extra_field.avsc</exclude>
<exclude>src/test/resources/cef/empty-row.txt</exclude>
<exclude>src/test/resources/cef/misformatted-row.txt</exclude>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
index ab20aad811..6f22123170 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
@@ -91,7 +91,7 @@ public class AvroReaderWithExplicitSchema extends
AvroRecordReader {
try {
genericRecord = datumReader.read(null, decoder);
} catch (final EOFException eof) {
- return null;
+ throw new IOException("Was expecting more data, but reached EOF.",
eof);
}
return genericRecord;
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
index 6a3a639161..9bbaa55bb7 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
@@ -16,14 +16,10 @@
*/
package org.apache.nifi.avro;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.File;
import java.io.FileInputStream;
@@ -32,11 +28,15 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
public class TestAvroReaderWithExplicitSchema {
@@ -105,6 +105,34 @@ public class TestAvroReaderWithExplicitSchema {
assertThrows(IOException.class, () -> new
AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema));
}
+ @Test
+ public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema()
throws Exception {
+ AvroReaderWithExplicitSchema avroReader =
createAvroReaderWithExplicitSchema(
+ "src/test/resources/avro/schemaless_simple_record.avro",
+ "src/test/resources/avro/schemaless_simple_record.avsc"
+ );
+
+ GenericData.Record expected = new GenericData.Record(new
Schema.Parser().parse(new
File("src/test/resources/avro/schemaless_simple_record.avsc")));
+ expected.put("field_1", 123);
+ expected.put("field_2", "44");
+ expected.put("field_3", 5);
+
+ GenericRecord actual1 = avroReader.nextAvroRecord();
+ assertEquals(expected, actual1);
+
+ GenericRecord actual2 = avroReader.nextAvroRecord();
+ assertNull(actual2);
+ }
+
+ @Test
+ public void
testAvroExplicitReaderWithSchemalessFileAndWrongExplicitSchema() throws
Exception {
+ AvroReaderWithExplicitSchema avroReader =
createAvroReaderWithExplicitSchema(
+ "src/test/resources/avro/schemaless_simple_record.avro",
+
"src/test/resources/avro/schemaless_simple_record_extra_field.avsc"
+ );
+ assertThrows(IOException.class, avroReader::nextAvroRecord);
+ }
+
@Test
public void
testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize()
throws Exception {
// GIVEN
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro
new file mode 100644
index 0000000000..858062de33
Binary files /dev/null and
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro
differ
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc
new file mode 100644
index 0000000000..788b7b73c7
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc
@@ -0,0 +1,28 @@
+{
+ "type": "record",
+ "name": "nifiRecord",
+ "namespace": "org.apache.nifi",
+ "fields": [
+ {
+ "name": "field_1",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "field_2",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "field_3",
+ "type": [
+ "int",
+ "null"
+ ]
+ }
+ ]
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc
new file mode 100644
index 0000000000..276107a0e1
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc
@@ -0,0 +1,24 @@
+{
+ "type":"record",
+ "name":"message_name",
+ "namespace":"message_namespace",
+ "fields":[
+ {
+ "name":"field_1",
+ "type":["long"]
+ },
+ {
+ "name":"field_2",
+ "type":["string"]
+ },
+ {
+ "name":"field_3",
+ "type":["int"]
+ },
+ {
+ "name":"extra",
+ "type":["long"]
+ }
+ ]
+}
+