[
https://issues.apache.org/jira/browse/AVRO-3202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Highley updated AVRO-3202:
-------------------------------
Description:
I believe a change in Avro 1.10 has caused a bug in Nifi's Avro reader when a
field in the schema is not in the data, and the schema field has an alias. When
the reader is searching for data fields in the FlowFile, it first tries the
schema's field name and if not found then tries any aliases. The issue occurs
when the field is not in the data. Prior to Avro 1.10.0,
org/apache/avro/generic/GenericData$Record.get(String key) would just return
null if the field name wasn't found in the schema. In 1.10, this was changed to
throw an exception. It occurs when trying to get data using the alias name
after first trying the field name (data does not contain a field with either
name).
In my example, this is the schema field:
{noformat}
{"name":"phone_number","type":["null","string"],"default":null,"aliases":["PhoneNumber"]}{noformat}
stack trace (note field name in error message is the alias name 'PhoneNumber',
not field name 'phone_number'
{noformat}
org.apache.nifi.serialization.MalformedRecordException: Error while getting
next record. Root cause: org.apache.avro.AvroRuntimeException: Not a valid
schema field: PhoneNumber
at
org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
at com.sun.proxy.$Proxy231.nextRecord(Unknown Source)
at org.apache.nifi.serialization.RecordReader$nextRecord.call(Unknown
Source)
at Script4c2db96c.run(Script4c2db96c.groovy:19)
at
org.apache.nifi.processors.groovyx.ExecuteGroovyScript.onTrigger(ExecuteGroovyScript.java:472)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field:
PhoneNumber
at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
at
org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:846)
at
org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:835)
at
org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
... 22 common frames omitted
{noformat}
Pertinent code:
[https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java]
{noformat}
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord
avroRecord, final RecordSchema recordSchema, final Charset charset) {
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
Object value = avroRecord.get(recordField.getFieldName());
if (value == null) {
for (final String alias : recordField.getAliases()) {
value = avroRecord.get(alias); <<<<<< exception occurs <<<<<<
if (value != null) { break; }
}
}
{noformat}
*GenericData$Record.get(String key):*
*1.10.0*
[https://github.com/apache/avro/blob/release-1.10.0/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]
public Object get(String key) {
{noformat}
Field field = schema.getField(key);
if (field == null) {
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
return values[field.pos()];
}{noformat}
*Avro 1.9.2*
[https://github.com/apache/avro/blob/release-1.9.2/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]
{noformat}
public Object get(String key) {
Field field = schema.getField(key);
if (field == null)
return null;
return values[field.pos()];
}{noformat}
My ExecuteGroovyScript processor. the call to createRecordReader ultimately
leads to the exception
{noformat}
import org.apache.nifi.serialization.record.Record
import org.apache.commons.io.IOUtils
def flowFile = session.get()
if (!flowFile) { log.warn("No flowFile")}
InputStream inputStream
def reader
try {
inputStream = session.read(flowFile)
reader = RecordReader.reader.createRecordReader(flowFile, inputStream,
log){noformat}
was:
I believe a change in Avro 1.10 has caused a bug in Nifi's Avro reader when a
field in the schema is not in the data, and the schema field has an alias. When
the reader is searching for data fields in the FlowFile, it first tries the
schema's field name and if not found then tries any aliases. The issue occurs
when the field is not in the data. Prior to Avro 1.10.0,
org/apache/avro/generic/GenericData$Record.get(String key) would just return
null if the field name wasn't found in the schema. In 1.10, this was changed to
throw an exception. It occurs when trying to get data using the alias name
after first trying the field name (data does not contain a field with either
name).
In my example, this is the schema field:
{noformat}
{"name":"phone_number","type":["null","string"],"default":null,"aliases":["PhoneNumber"]}{noformat}
stack trace (note field name in error message is the alias name 'PhoneNumber',
not field name 'phone_number'
{noformat}
org.apache.nifi.serialization.MalformedRecordException: Error while getting
next record. Root cause: org.apache.avro.AvroRuntimeException: Not a valid
schema field: PhoneNumber
at
org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
at com.sun.proxy.$Proxy231.nextRecord(Unknown Source)
at org.apache.nifi.serialization.RecordReader$nextRecord.call(Unknown
Source)
at Script4c2db96c.run(Script4c2db96c.groovy:19)
at
org.apache.nifi.processors.groovyx.ExecuteGroovyScript.onTrigger(ExecuteGroovyScript.java:472)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field:
PhoneNumber
at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
at
org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:846)
at
org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:835)
at
org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
... 22 common frames omitted
{noformat}
Pertinent code:
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
{noformat}
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord
avroRecord, final RecordSchema recordSchema, final Charset charset) {
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
Object value = avroRecord.get(recordField.getFieldName());
if (value == null) {
for (final String alias : recordField.getAliases()) {
value = avroRecord.get(alias); <<<<<< exception occurs <<<<<<
if (value != null) { break; }
}
}
{noformat}
*GenericData$Record.get(String key):*
*1.10.0*
https://github.com/apache/avro/blob/release-1.10.0/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
public Object get(String key) {
{noformat}
Field field = schema.getField(key);
if (field == null) {
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
return values[field.pos()];
}{noformat}
*Avro 1.9.2*
https://github.com/apache/avro/blob/release-1.9.2/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
{noformat}
public Object get(String key) {
Field field = schema.getField(key);
if (field == null)
return null;
return values[field.pos()];
}{noformat}
My GroovyScriptProcessor:
{noformat}
import org.apache.nifi.serialization.record.Record
import org.apache.commons.io.IOUtils
def flowFile = session.get()
if (!flowFile) { log.warn("No flowFile")}
InputStream inputStream
def reader
try {
inputStream = session.read(flowFile)
reader = RecordReader.reader.createRecordReader(flowFile, inputStream,
log){noformat}
Environment: Nifi 1.12.1 Avro 1.10.2 (was: Nifi 1.12.2 Avro 1.10.2)
> Avro Reader exception when data field not present, and schema has alias
> -----------------------------------------------------------------------
>
> Key: AVRO-3202
> URL: https://issues.apache.org/jira/browse/AVRO-3202
> Project: Apache Avro
> Issue Type: Bug
> Environment: Nifi 1.12.1 Avro 1.10.2
> Reporter: Josh Highley
> Priority: Major
>
> I believe a change in Avro 1.10 has caused a bug in Nifi's Avro reader when a
> field in the schema is not in the data, and the schema field has an alias.
> When the reader is searching for data fields in the FlowFile, it first tries
> the schema's field name and if not found then tries any aliases. The issue
> occurs when the field is not in the data. Prior to Avro 1.10.0,
> org/apache/avro/generic/GenericData$Record.get(String key) would just return
> null if the field name wasn't found in the schema. In 1.10, this was changed
> to throw an exception. It occurs when trying to get data using the alias name
> after first trying the field name (data does not contain a field with either
> name).
> In my example, this is the schema field:
> {noformat}
> {"name":"phone_number","type":["null","string"],"default":null,"aliases":["PhoneNumber"]}{noformat}
> stack trace (note field name in error message is the alias name
> 'PhoneNumber', not field name 'phone_number'
>
> {noformat}
> org.apache.nifi.serialization.MalformedRecordException: Error while getting
> next record. Root cause: org.apache.avro.AvroRuntimeException: Not a valid
> schema field: PhoneNumber
> at
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
> at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
> at com.sun.proxy.$Proxy231.nextRecord(Unknown Source)
> at
> org.apache.nifi.serialization.RecordReader$nextRecord.call(Unknown Source)
> at Script4c2db96c.run(Script4c2db96c.groovy:19)
> at
> org.apache.nifi.processors.groovyx.ExecuteGroovyScript.onTrigger(ExecuteGroovyScript.java:472)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field:
> PhoneNumber
> at
> org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:846)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:835)
> at
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
> ... 22 common frames omitted
>
> {noformat}
>
> Pertinent code:
> [https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java]
>
> {noformat}
> public static Map<String, Object> convertAvroRecordToMap(final GenericRecord
> avroRecord, final RecordSchema recordSchema, final Charset charset) {
> final Map<String, Object> values = new
> HashMap<>(recordSchema.getFieldCount());
> for (final RecordField recordField : recordSchema.getFields()) {
> Object value = avroRecord.get(recordField.getFieldName());
> if (value == null) {
> for (final String alias : recordField.getAliases()) {
> value = avroRecord.get(alias); <<<<<< exception occurs <<<<<<
> if (value != null) { break; }
> }
> }
> {noformat}
>
>
> *GenericData$Record.get(String key):*
> *1.10.0*
> [https://github.com/apache/avro/blob/release-1.10.0/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]
> public Object get(String key) {
> {noformat}
> Field field = schema.getField(key);
> if (field == null) {
> throw new AvroRuntimeException("Not a valid schema field: " + key);
> }
> return values[field.pos()];
> }{noformat}
>
>
> *Avro 1.9.2*
> [https://github.com/apache/avro/blob/release-1.9.2/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]
>
> {noformat}
> public Object get(String key) {
> Field field = schema.getField(key);
> if (field == null)
> return null;
> return values[field.pos()];
> }{noformat}
>
>
> My ExecuteGroovyScript processor. the call to createRecordReader ultimately
> leads to the exception
>
> {noformat}
> import org.apache.nifi.serialization.record.Record
> import org.apache.commons.io.IOUtils
> def flowFile = session.get()
> if (!flowFile) { log.warn("No flowFile")}
> InputStream inputStream
> def reader
> try {
> inputStream = session.read(flowFile)
> reader = RecordReader.reader.createRecordReader(flowFile, inputStream,
> log){noformat}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)