[ 
https://issues.apache.org/jira/browse/NIFI-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

olivier brobecker resolved NIFI-14247.
--------------------------------------
    Fix Version/s: 2.1.0
       Resolution: Fixed

issue has been fixed with issue NIFI-14123

> tombstone messages in consumeKafka processor
> --------------------------------------------
>
>                 Key: NIFI-14247
>                 URL: https://issues.apache.org/jira/browse/NIFI-14247
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.0.0
>         Environment: kubernetes using apache/nifi docker image
>            Reporter: olivier brobecker
>            Priority: Major
>             Fix For: 2.1.0
>
>
> when using the following parameters for processor consumeKafka 2.0.0 from 
> bundle nifi-kafka-nar with processing strategy set to FLOW_FILE or 
> DEMARCATOR, the processor failed to consume a kafka document if this one has 
> no messages ( a tombstone message ).
> When reading document, a flow file should have been generated with no content 
> and the attribute kafka.tombstone set to true.
>  
> Instead, the processor yield and the following error can be seen in the logs :
> {code:java}
> 2025-02-07 06:29:46,619 ERROR [Timer-Driven Process Thread-10] 
> o.a.nifi.kafka.processors.ConsumeKafka 
> ConsumeKafka[id=df115ac8-0194-1000-ffff-fffffa8cb386] Failed to consume Kafka 
> Records
> java.lang.NullPointerException: Value required
>         at java.base/java.util.Objects.requireNonNull(Objects.java:259)
>         at 
> org.apache.nifi.kafka.service.api.record.ByteRecord.<init>(ByteRecord.java:55)
>         at 
> org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:195)
>         at 
> org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService$RecordIterator.next(Kafka3ConsumerService.java:167)
>         at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>         at 
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:251)
>         at 
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:237)
>         at jdk.proxy12/jdk.proxy12.$Proxy213.next(Unknown Source)
>         at 
> org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter.toFlowFiles(FlowFileStreamKafkaMessageConverter.java:59)
>         at 
> org.apache.nifi.kafka.processors.ConsumeKafka.processInputFlowFile(ConsumeKafka.java:489)
>         at 
> org.apache.nifi.kafka.processors.ConsumeKafka.processConsumerRecords(ConsumeKafka.java:437)
>         at 
> org.apache.nifi.kafka.processors.ConsumeKafka.onTrigger(ConsumeKafka.java:376)
>         at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
>         at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:244)
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
>         at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
>         at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>         at java.base/java.lang.Thread.run(Thread.java:1583) {code}
>  
> kafka3ConnectionService is configured to connect to an internal kubernetes 
> kafka cluster
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to