[
https://issues.apache.org/jira/browse/NIFI-7249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matt Burgess reassigned NIFI-7249:
----------------------------------
Assignee: Matt Burgess
> [Regression] AvroReader: Could not parse incoming data
> ------------------------------------------------------
>
> Key: NIFI-7249
> URL: https://issues.apache.org/jira/browse/NIFI-7249
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 1.10.0, 1.11.0, 1.11.1, 1.11.2, 1.11.3
> Environment: Debian, Java 11 and Java 8
> Reporter: Philipp Leufke
> Assignee: Matt Burgess
> Priority: Major
> Attachments: AvroReader_bug_MWE.xml
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> *Assessment*
> {code:java}
> AvroTypeUtil.convertUnionFieldValue
> {code}
> has the following:
> {code:java}
> Optional<Schema> mostSuitableType =
> DataTypeUtils.findMostSuitableType(
> originalValue,
> fieldSchema.getTypes().stream().filter(schema ->
> schema.getType() != Type.NULL).collect(Collectors.toList()),
> subSchema -> AvroTypeUtil.determineDataType(subSchema)
> );
> {code}
> which in turn has the following:
> {code:java}
> DataType inferredDataType = inferDataType(value, null);
> {code}
> which in turn has the following:
> {code:java}
> if (value instanceof Map) {
> final Map<String, ?> map = (Map<String, ?>) value;
> {code}
> {{originalValue/value}} is a map extracted from an avro record that has
> {{Utf8}} keys instead of {{String}}.
> The issue in general however is the fact that we are dealing with an
> *avro-specific object* where previously *only NiFi-specific value objects
> were processed*.
> There are multiple approaches to fix this:
> # Consider this special case as a technical issue. We accept the fact that
> avro objects can leak into this layer and prepare it so it behaves as needed.
> I.e. transform the avro map to another where the keys are {{String}} objects.
> # Consider this an error-handling issue. Inference can be treated as a
> best-effort attempt and in case of an error we can fall back to the original
> logic. Inference was added here to be able to choose the best matching type
> from a UNION/CHOICE. If inference doesn't yield a result, the original logic
> goes over all types within the UNION/CHOICE and selects the _first
> compatible_ one. When a Map is in a UNION/CHOICE the other types will not
> pose compatibility issues so the original logic would work well.
> (1. and 2. are not mutually exclusive.)
> # Enhance inference logic so that the avro object is converted to a general
> object before inference occurs. This would eliminate the avro (or other
> third-party specific) objects being able to leak into the framework's
> format-agnostic layer.
> ----
> *Issue report*
> Severe regression in Version 1.11.3, compared to 1.9.2:
> Record based processors cannot deserialize Avro messages any longer. Examples:
> * ConsumeKafkaRecord: with embedded Avro schema or using Confluent Schema
> Registry
> * ConvertRecord: with embedded Avro schema or using Confluent Schema
> Registry, too
> * probably others as well...
> Error messages:
> {noformat}
> ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d]
> Failed to process
> StandardFlowFileRecord[uuid=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,claim=StandardContentClaim
>
> [resourceClaim=StandardResourceClaim[id=1584002690648-1091,
> container=default, section=67], offset=276387, length=3487]
> ,offset=0,name=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,size=3487]; will route to
> failure:
> Could not parse incoming data{noformat}
> {noformat}
> ConsumeKafkaRecord_2_0[id=d9ebdbda-51b7-38ce-b43e-3197322bd2e1]
> Failed to parse message from Kafka using the configured Record Reader.
> Will route message as its own FlowFile to the 'parse.failure' relationship:
> org.apache.nifi.serialization.MalformedRecordException:
> Error while getting next record. Root cause: java.lang.ClassCastException
> {noformat}
>
> However, the messages with enmbedded schema can flawlessly be converted to
> JSON using ConvertAvroToJson.
>
> The behavior has been confirmed using various different flows and
> configurations with different Java versions. A downgrade to Nifi 1.9.2
> resolves the issue, a subsequent upgrade to 1.11.3 brings it back.
>
> Please find attached a minimal example template...
>
> Stack traces:
>
> {noformat}
> 2020-03-12 09:37:16,628 DEBUG [Timer-Driven Process Thread-4]
> org.apache.nifi.avro.AvroTypeUtil fail to convert field tags
> java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast
> to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of
> loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in
> module java.base of loader 'bootstrap')
> at
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
> at
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
> at
> org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
> at
> org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
> at
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
> at
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
> at
> org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
> at
> org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)2020-03-12
> 09:37:16,632 ERROR [Timer-Driven Process Thread-4]
> o.a.n.processors.standard.ConvertRecord
> ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d] Failed to process
> StandardFlowFileRecord[uuid=33856f9d-1991-4c95-90c2-3ffd032fc840,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1584005835899-1, container=default,
> section=1], offset=851,
> length=3487],offset=0,name=33856f9d-1991-4c95-90c2-3ffd032fc840,size=3487];
> will route to failure: org.apache.nifi.processor.exception.ProcessException:
> Could not parse incoming data
> org.apache.nifi.processor.exception.ProcessException: Could not parse
> incoming data
> at
> org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:171)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
> at
> org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.nifi.serialization.MalformedRecordException: Error
> while getting next record. Root cause: java.lang.ClassCastException: class
> org.apache.avro.util.Utf8 cannot be cast to class java.lang.String
> (org.apache.avro.util.Utf8 is in unnamed module of loader
> org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module
> java.base of loader 'bootstrap')
> at
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
> at
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
> at
> org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
> ... 13 common frames omitted
> Caused by: java.lang.ClassCastException: class org.apache.avro.util.Utf8
> cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in
> unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3;
> java.lang.String is in module java.base of loader 'bootstrap')
> at
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
> at
> org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
> at
> org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
> at
> org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
> at
> org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
> at
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
> ... 15 common frames omitted
> {noformat}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)