[ 
https://issues.apache.org/jira/browse/NIFI-7249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess reassigned NIFI-7249:
----------------------------------

    Assignee: Matt Burgess

> [Regression] AvroReader: Could not parse incoming data
> ------------------------------------------------------
>
>                 Key: NIFI-7249
>                 URL: https://issues.apache.org/jira/browse/NIFI-7249
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.10.0, 1.11.0, 1.11.1, 1.11.2, 1.11.3
>         Environment: Debian, Java 11 and Java 8
>            Reporter: Philipp Leufke
>            Assignee: Matt Burgess
>            Priority: Major
>         Attachments: AvroReader_bug_MWE.xml
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> *Assessment*
> {code:java}
> AvroTypeUtil.convertUnionFieldValue
> {code}
> has the following:
> {code:java}
>         Optional<Schema> mostSuitableType = 
> DataTypeUtils.findMostSuitableType(
>                 originalValue,
>                 fieldSchema.getTypes().stream().filter(schema -> 
> schema.getType() != Type.NULL).collect(Collectors.toList()),
>                 subSchema -> AvroTypeUtil.determineDataType(subSchema)
>         );
> {code}
> which in turn has the following:
> {code:java}
> DataType inferredDataType = inferDataType(value, null);
> {code}
> which in turn has the following:
> {code:java}
>         if (value instanceof Map) {
>             final Map<String, ?> map = (Map<String, ?>) value;
> {code}
> {{originalValue/value}} is a map extracted from an avro record that has 
> {{Utf8}} keys instead of {{String}}.
> The issue in general however is the fact that we are dealing with an 
> *avro-specific object* where previously *only NiFi-specific value objects 
> were processed*.
> There are multiple approaches to fix this:
> # Consider this special case as a technical issue. We accept the fact that 
> avro objects can leak into this layer and prepare it so it behaves as needed. 
> I.e. transform the avro map to another where the keys are {{String}} objects.
> # Consider this an error-handling issue. Inference can be treated as a 
> best-effort attempt and in case of an error we can fall back to the original 
> logic. Inference was added here to be able to choose the best matching type 
> from a UNION/CHOICE. If inference doesn't yield a result, the original logic 
> goes over all types within the UNION/CHOICE and selects the _first 
> compatible_ one. When a Map is in a UNION/CHOICE the other types will not 
> pose compatibility issues so the original logic would work well.
> (1. and 2. are not mutually exclusive.)
> # Enhance inference logic so that the avro object is converted to a general 
> object before inference occurs. This would eliminate the avro (or other 
> third-party specific) objects being able to leak into the framework's 
> format-agnostic layer.
> ----
> *Issue report*
>  Severe regression in Version 1.11.3, compared to 1.9.2:
> Record based processors cannot deserialize Avro messages any longer. Examples:
>  * ConsumeKafkaRecord: with embedded Avro schema or using Confluent Schema 
> Registry
>  * ConvertRecord: with embedded Avro schema or using Confluent Schema 
> Registry, too
>  * probably others as well...
> Error messages:
> {noformat}
> ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d]
> Failed to process 
> StandardFlowFileRecord[uuid=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,claim=StandardContentClaim
>  
> [resourceClaim=StandardResourceClaim[id=1584002690648-1091, 
> container=default, section=67], offset=276387, length=3487]
> ,offset=0,name=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,size=3487]; will route to 
> failure:
> Could not parse incoming data{noformat}
> {noformat}
> ConsumeKafkaRecord_2_0[id=d9ebdbda-51b7-38ce-b43e-3197322bd2e1]
> Failed to parse message from Kafka using the configured Record Reader.
> Will route message as its own FlowFile to the 'parse.failure' relationship: 
> org.apache.nifi.serialization.MalformedRecordException:
> Error while getting next record. Root cause: java.lang.ClassCastException
> {noformat}
>  
> However, the messages with enmbedded schema can flawlessly be converted to 
> JSON using ConvertAvroToJson.
>  
> The behavior has been confirmed using various different flows and 
> configurations with different Java versions. A downgrade to Nifi 1.9.2 
> resolves the issue, a subsequent upgrade to 1.11.3 brings it back.
>  
> Please find attached a minimal example template...
>  
> Stack traces:
>  
> {noformat}
> 2020-03-12 09:37:16,628 DEBUG [Timer-Driven Process Thread-4] 
> org.apache.nifi.avro.AvroTypeUtil fail to convert field tags
> java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast 
> to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of 
> loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in 
> module java.base of loader 'bootstrap')
>         at 
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
>         at 
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
>         at 
> org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
>         at 
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
>         at 
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
>         at 
> org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
>         at 
> org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
>         at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
>         at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>         at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:834)2020-03-12 
> 09:37:16,632 ERROR [Timer-Driven Process Thread-4] 
> o.a.n.processors.standard.ConvertRecord 
> ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d] Failed to process 
> StandardFlowFileRecord[uuid=33856f9d-1991-4c95-90c2-3ffd032fc840,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1584005835899-1, container=default, 
> section=1], offset=851, 
> length=3487],offset=0,name=33856f9d-1991-4c95-90c2-3ffd032fc840,size=3487]; 
> will route to failure: org.apache.nifi.processor.exception.ProcessException: 
> Could not parse incoming data
> org.apache.nifi.processor.exception.ProcessException: Could not parse 
> incoming data
>         at 
> org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:171)
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
>         at 
> org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
>         at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
>         at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>         at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.nifi.serialization.MalformedRecordException: Error 
> while getting next record. Root cause: java.lang.ClassCastException: class 
> org.apache.avro.util.Utf8 cannot be cast to class java.lang.String 
> (org.apache.avro.util.Utf8 is in unnamed module of loader 
> org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module 
> java.base of loader 'bootstrap')
>         at 
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
>         at 
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
>         at 
> org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
>         ... 13 common frames omitted
> Caused by: java.lang.ClassCastException: class org.apache.avro.util.Utf8 
> cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in 
> unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; 
> java.lang.String is in module java.base of loader 'bootstrap')
>         at 
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
>         at 
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
>         at 
> org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
>         at 
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
>         at 
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
>         ... 15 common frames omitted
> {noformat}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to