[ 
https://issues.apache.org/jira/browse/KAFKA-6133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elizabeth Bennett updated KAFKA-6133:
-------------------------------------
    Description: 
I just tried out the new rotate.interval.ms feature in the S3 connector to do 
time based flushing. I am getting this NPE on every event:

{code}[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
        at 
io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288)
        at 
io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
[2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover until 
manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to 
unrecoverable exception.
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748){code}

I dug into the S3 connect code a bit and it looks like the 
{{rotate.interval.ms}} feature only works if you are using the 
TimeBasedPartitioner. It will get the TimestampExtractor class from the 
TimeBasedPartitioner to determine the timestamp of the event, and will use this 
for the time based flushing.

I'm using a custom partitioner, but I'd still really like to use the 
{{rotate.interval.ms}} feature, using wall clock time to determine the flushing 
behavior.

I'd be willing to work on fixing this issue, but I want to confirm it is 
actually bug, and not that it was specifically designed to only work with the 
TimeBasedPartitioner. Even if it is the later, it should probably not crash 
with an NPE.

  was:
I just tried out the new rotate.interval.ms feature in the S3 connector to do 
time based flushing. I am getting this NPE on every event:

{{[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
        at 
io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288)
        at 
io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
[2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover until 
manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to 
unrecoverable exception.
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)}}

I dug into the S3 connect code a bit and it looks like the 
{{rotate.interval.ms}} feature only works if you are using the 
TimeBasedPartitioner. It will get the TimestampExtractor class from the 
TimeBasedPartitioner to determine the timestamp of the event, and will use this 
for the time based flushing.

I'm using a custom partitioner, but I'd still really like to use the 
{{rotate.interval.ms}} feature, using wall clock time to determine the flushing 
behavior.

I'd be willing to work on fixing this issue, but I want to confirm it is 
actually bug, and not that it was specifically designed to only work with the 
TimeBasedPartitioner. Even if it is the later, it should probably not crash 
with an NPE.


> NullPointerException in S3 Connector when using rotate.interval.ms
> ------------------------------------------------------------------
>
>                 Key: KAFKA-6133
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6133
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Elizabeth Bennett
>
> I just tried out the new rotate.interval.ms feature in the S3 connector to do 
> time based flushing. I am getting this NPE on every event:
> {code}[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
> java.lang.NullPointerException
>         at 
> io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288)
>         at 
> io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
>         at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:748)
> [2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover 
> until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
> [2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due 
> to unrecoverable exception.
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:748){code}
> I dug into the S3 connect code a bit and it looks like the 
> {{rotate.interval.ms}} feature only works if you are using the 
> TimeBasedPartitioner. It will get the TimestampExtractor class from the 
> TimeBasedPartitioner to determine the timestamp of the event, and will use 
> this for the time based flushing.
> I'm using a custom partitioner, but I'd still really like to use the 
> {{rotate.interval.ms}} feature, using wall clock time to determine the 
> flushing behavior.
> I'd be willing to work on fixing this issue, but I want to confirm it is 
> actually bug, and not that it was specifically designed to only work with the 
> TimeBasedPartitioner. Even if it is the later, it should probably not crash 
> with an NPE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to