[
https://issues.apache.org/jira/browse/NIFI-15027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18024134#comment-18024134
]
Paul Grey commented on NIFI-15027:
----------------------------------
Thanks for the report. Given the description, I was able to identify an issue
with the existing AvroWriter implementation. An Avro-specific error is being
thrown when processing payloads which are invalid as specified by the in-scope
schema. That error is not being handled by the "catch all" part of the Kafka
record handling.
I've pushed a proposed commit that adjusts the Avro handling, allowing
ConsumeKafka to do its job. If this changeset does not introduce any new
problems, I'll create a pull request.
> consumeKafka record reader NullPointerException
> -----------------------------------------------
>
> Key: NIFI-15027
> URL: https://issues.apache.org/jira/browse/NIFI-15027
> Project: Apache NiFi
> Issue Type: Bug
> Affects Versions: 2.5.0
> Reporter: Zenkovac
> Assignee: Paul Grey
> Priority: Major
>
> using consumeKafka with processing strategy: RECORD, setting a record reader
> with a field not nullable, when a message arrives that has the field null
> nifi produces a java exception instead of routing to parse failure queue.
>
> docs provide this info:
> ...if any of the Kafka messages are pulled but cannot be parsed or written
> with the configured Record Reader or Record Writer, the contents of the
> message will be written to a separate FlowFile, and that FlowFile will be
> transferred to the 'parse.failure' relationship...
>
> this is the exception produced:
> 2025-09-29 16:43:39,771 ERROR [Timer-Driven Process Thread-53]
> o.a.nifi.kafka.processors.ConsumeKafka
> ConsumeKafka[id=922f35bd-498c-3597-b22e-51a9bca18c51] Failed to consume Kafka
> Records
> java.lang.RuntimeException: Failed to process Kafka message
> at
> org.apache.nifi.kafka.processors.consumer.convert.AbstractRecordStreamKafkaMessageConverter.toFlowFiles(AbstractRecordStreamKafkaMessageConverter.java:119)
> at
> org.apache.nifi.kafka.processors.ConsumeKafka.processInputRecords(ConsumeKafka.java:598)
> at
> org.apache.nifi.kafka.processors.ConsumeKafka.processConsumerRecords(ConsumeKafka.java:561)
> at
> org.apache.nifi.kafka.processors.ConsumeKafka.onTrigger(ConsumeKafka.java:422)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1276)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:220)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
> at
> org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> at
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException:
> java.lang.NullPointerException: null value for (non-nullable) long at
> {*}{{*}}{{*}}OBFUSCATED{{*}}{{*}}{*}
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:326)
> at
> org.apache.nifi.avro.WriteAvroResultWithSchema.writeRecord(WriteAvroResultWithSchema.java:61)
> at
> org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59)
> at
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:251)
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:237)
> at jdk.proxy236/jdk.proxy236.$Proxy449.write(Unknown Source)
> at
> org.apache.nifi.kafka.processors.consumer.convert.AbstractRecordStreamKafkaMessageConverter.processSingleRecord(AbstractRecordStreamKafkaMessageConverter.java:180)
> at
> org.apache.nifi.kafka.processors.consumer.convert.AbstractRecordStreamKafkaMessageConverter.toFlowFiles(AbstractRecordStreamKafkaMessageConverter.java:110)
> ... 14 common frames omitted
> Caused by: java.lang.NullPointerException: null value for (non-nullable) long
> at {*}{{*}}{{*}}OBFUSCATED{{*}}{{*}}{*}
> at
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88)
> at
> org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84)
> at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:323)
> ... 23 common frames omitted
> Caused by: java.lang.NullPointerException: Cannot invoke
> "java.lang.Number.longValue()" because "datum" is null
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:174)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> ... 24 common frames omitted
--
This message was sent by Atlassian Jira
(v8.20.10#820010)