[ 
https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-7434.
------------------------------------------
       Resolution: Fixed
    Fix Version/s: 2.1.0
                   2.0.1

Issue resolved by pull request 5700
[https://github.com/apache/kafka/pull/5700]

> DeadLetterQueueReporter throws NPE if transform throws NPE
> ----------------------------------------------------------
>
>                 Key: KAFKA-7434
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7434
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.0.0
>         Environment: jdk 8
>            Reporter: Michal Borowiecki
>            Assignee: Michal Borowiecki
>            Priority: Major
>             Fix For: 2.0.1, 2.1.0
>
>
> A NPE thrown from a transform in a connector configured with
> errors.deadletterqueue.context.headers.enable=true
> causes DeadLetterQueueReporter to break with a NPE.
> {code}
> Executing stage 'TRANSFORMATION' with class 
> 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
> {topic='****', partition=1, offset=0, timestamp=1537370573366, 
> timestampType=CreateTime}. 
> (org.apache.kafka.connect.runtime.errors.LogReporter)
> java.lang.NullPointerException
> Task threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
>       at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
>       at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
>       at 
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
>       at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
>       at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> This is caused by populateContextHeaders only checking if the Throwable is 
> not null, but not checking that the message in the Throwable is not null 
> before trying to serialize the message:
> [https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]
> {code:java}
> if (context.error() != null) {
>      headers.add(ERROR_HEADER_EXCEPTION, 
> toBytes(context.error().getClass().getName()));
>      headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
> toBytes(context.error().getMessage()));
> {code}
> toBytes throws an NPE if passed null as the parameter.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to