[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423452#comment-16423452
 ] 

Randall Hauch commented on KAFKA-6728:
--------------------------------------

Testing with a test source connector reveals the same problem:

{noformat}
[2018-04-02 21:59:28,357] INFO Record: 
SourceRecord{sourcePartition={my-source=0}, sourceOffset={offset=1}} 
ConnectRecord{topic='my-topic', kafkaPartition=null, key=key-1, value=value-1, 
timestamp=null, headers=ConnectHeaders(headers=[ConnectHeader(key=my.header, 
value=MyHeaderValue, schema=Schema{STRING})])} 
(com.mycompany.examples.MySourceTask:79)
[2018-04-02 21:59:28,358] INFO MySourceTask.poll() - returning 1 record 
(com.mycompany.examples.MySourceTask:54)
[2018-04-02 21:59:28,359] INFO WorkerSourceTask{id=my-test-source-0} Committing 
offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-04-02 21:59:28,359] INFO WorkerSourceTask{id=my-test-source-0} flushing 0 
outstanding messages for offset commit 
(org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-04-02 21:59:28,360] ERROR WorkerSourceTask{id=my-test-source-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.NullPointerException
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
[2018-04-02 21:59:28,361] ERROR WorkerSourceTask{id=my-test-source-0} Task is 
being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:173)
{noformat}

Adding the following to the connector configuration results in a successful run:

{code}
header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter
{code}

This demonstrates that the problem is not with the message headers (since this 
test connector is writing headers correctly using the Connect API), but instead 
that the NPE results from the Worker not properly instantiating the 
HeaderConverter.

Removing the {{header.converter}} property from the connector configuration and 
temporarily replacing the {{connect-runtime-1.1.0.jar}} with one built with my 
aforementioned PR tested manually using a custom source connector that outputs 
header values with 8 different combinations for the {{header.converter}} 
configuration settings:

#  default value
#  worker configuration has {{header.converter}} explicitly set to the default
#  worker configuration has {{header.converter}} set to a 
custom{{HeaderConverter}} implementation in the same plugin
#  worker configuration has {{header.converter}} set to a custom 
{{HeaderConverter}} implementation in a _different_ plugin
#  connector configuration has {{header.converter}} explicitly set to the 
default
#  connector configuration has {{header.converter}} set to a custom 
{{HeaderConverter}} implementation in the same plugin
#  connector configuration has {{header.converter}} set to a custom 
{{HeaderConverter}} implementation in a _different_ plugin
# worker configuration has {{header.converter}} explicitly set to the default, 
and the connector configuration has {{header.converter}} set to a custom 
{{HeaderConverter}} implementation in a _different_ plugin

The worker created the correct {{HeaderConverter}} implementation with the 
correct configuration in all of these tests.

Finally, the default configuration was used with the aforementioned custom 
source connector that generated records with headers, and an S3 connector that 
consumes the records with headers (but didn't do anything with them). This test 
also passed.


> Kafka Connect Header Null Pointer Exception
> -------------------------------------------
>
>                 Key: KAFKA-6728
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6728
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>         Environment: Linux Mint
>            Reporter: Philippe Hong
>            Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to