[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423452#comment-16423452 ]
Randall Hauch commented on KAFKA-6728: -------------------------------------- Testing with a test source connector reveals the same problem: {noformat} [2018-04-02 21:59:28,357] INFO Record: SourceRecord{sourcePartition={my-source=0}, sourceOffset={offset=1}} ConnectRecord{topic='my-topic', kafkaPartition=null, key=key-1, value=value-1, timestamp=null, headers=ConnectHeaders(headers=[ConnectHeader(key=my.header, value=MyHeaderValue, schema=Schema{STRING})])} (com.mycompany.examples.MySourceTask:79) [2018-04-02 21:59:28,358] INFO MySourceTask.poll() - returning 1 record (com.mycompany.examples.MySourceTask:54) [2018-04-02 21:59:28,359] INFO WorkerSourceTask{id=my-test-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328) [2018-04-02 21:59:28,359] INFO WorkerSourceTask{id=my-test-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345) [2018-04-02 21:59:28,360] ERROR WorkerSourceTask{id=my-test-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) java.lang.NullPointerException at org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2018-04-02 21:59:28,361] ERROR WorkerSourceTask{id=my-test-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173) {noformat} Adding the following to the connector configuration results in a successful run: {code} header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter {code} This demonstrates that the problem is not with the message headers (since this test connector is writing headers correctly using the Connect API), but instead that the NPE results from the Worker not properly instantiating the HeaderConverter. Removing the {{header.converter}} property from the connector configuration and temporarily replacing the {{connect-runtime-1.1.0.jar}} with one built with my aforementioned PR tested manually using a custom source connector that outputs header values with 8 different combinations for the {{header.converter}} configuration settings: # default value # worker configuration has {{header.converter}} explicitly set to the default # worker configuration has {{header.converter}} set to a custom{{HeaderConverter}} implementation in the same plugin # worker configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation in a _different_ plugin # connector configuration has {{header.converter}} explicitly set to the default # connector configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation in the same plugin # connector configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation in a _different_ plugin # worker configuration has {{header.converter}} explicitly set to the default, and the connector configuration has {{header.converter}} set to a custom {{HeaderConverter}} implementation in a _different_ plugin The worker created the correct {{HeaderConverter}} implementation with the correct configuration in all of these tests. Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed. > Kafka Connect Header Null Pointer Exception > ------------------------------------------- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.1.0 > Environment: Linux Mint > Reporter: Philippe Hong > Priority: Critical > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)