[
https://issues.apache.org/jira/browse/NIFI-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
olivier brobecker resolved NIFI-14247.
--------------------------------------
Fix Version/s: 2.1.0
Resolution: Fixed
issue has been fixed with issue NIFI-14123
> tombstone messages in consumeKafka processor
> --------------------------------------------
>
> Key: NIFI-14247
> URL: https://issues.apache.org/jira/browse/NIFI-14247
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 2.0.0
> Environment: kubernetes using apache/nifi docker image
> Reporter: olivier brobecker
> Priority: Major
> Fix For: 2.1.0
>
>
> when using the following parameters for processor consumeKafka 2.0.0 from
> bundle nifi-kafka-nar with processing strategy set to FLOW_FILE or
> DEMARCATOR, the processor failed to consume a kafka document if this one has
> no messages ( a tombstone message ).
> When reading document, a flow file should have been generated with no content
> and the attribute kafka.tombstone set to true.
>
> Instead, the processor yield and the following error can be seen in the logs :
> {code:java}
> 2025-02-07 06:29:46,619 ERROR [Timer-Driven Process Thread-10]
> o.a.nifi.kafka.processors.ConsumeKafka
> ConsumeKafka[id=df115ac8-0194-1000-ffff-fffffa8cb386] Failed to consume Kafka
> Records
> java.lang.NullPointerException: Value required
> at java.base/java.util.Objects.requireNonNull(Objects.java:259)
> at
> org.apache.nifi.kafka.service.api.record.ByteRecord.<init>(ByteRecord.java:55)
> at
> org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:195)
> at
> org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:167)
> at
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:251)
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:237)
> at jdk.proxy12/jdk.proxy12.$Proxy213.next(Unknown Source)
> at
> org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter.toFlowFiles(FlowFileStreamKafkaMessageConverter.java:59)
> at
> org.apache.nifi.kafka.processors.ConsumeKafka.processInputFlowFile(ConsumeKafka.java:489)
> at
> org.apache.nifi.kafka.processors.ConsumeKafka.processConsumerRecords(ConsumeKafka.java:437)
> at
> org.apache.nifi.kafka.processors.ConsumeKafka.onTrigger(ConsumeKafka.java:376)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> at
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1583) {code}
>
> kafka3ConnectionService is configured to connect to an internal kubernetes
> kafka cluster
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)