[
https://issues.apache.org/jira/browse/STORM-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886493#comment-15886493
]
Adrianos Dadis commented on STORM-2207:
---------------------------------------
This is still a problem in 1.0.3 release (try to migrate from 0.9.6).
This problem occurs when Kafka Coordinator decides to rebalance for some
reason. Then KafkaSpout throw NPE because does not know what to do in acked Map
for the incoming acked tuple.
STORM-2104 seems to handle that problem, but it is only available for 2.x and
1.1.x branches.
> Kafka Spout NullPointerException during ack
> -------------------------------------------
>
> Key: STORM-2207
> URL: https://issues.apache.org/jira/browse/STORM-2207
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka
> Affects Versions: 1.0.2
> Reporter: Nick Cuneo
> Priority: Critical
>
> This occurs on startup of the topology. There should be some null check
> safeguards, but i'm not sure what's causing it to occur in the first
> place...my guess is the topic partition is not found in the ack map.
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
> ~[storm-core-1.0.2.jar:1.0.2]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
> ~[storm-core-1.0.2.jar:1.0.2]
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
> [storm-core-1.0.2.jar:1.0.2]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
> at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
> ~[stormjar.jar:?]
> at
> org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
> ~[storm-core-1.0.2.jar:1.0.2]
> ... 7 more
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
> java.lang.RuntimeException: java.lang.NullPointerException
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
> ~[storm-core-1.0.2.jar:1.0.2]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
> ~[storm-core-1.0.2.jar:1.0.2]
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
> [storm-core-1.0.2.jar:1.0.2]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
> at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
> ~[stormjar.jar:?]
> at
> org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
> ~[storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
> ~[storm-core-1.0.2.jar:1.0.2]
> ... 7 more
> 2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-core-1.0.2.jar:1.0.2]
> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
> at
> org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765)
> [storm-core-1.0.2.jar:1.0.2]
> at
> org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274)
> [storm-core-1.0.2.jar:1.0.2]
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494)
> [storm-core-1.0.2.jar:1.0.2]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> The method and line number in question below:
> @Override
> public void ack(Object messageId) {
> final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
> if (!consumerAutoCommitMode) { // Only need to keep track of acked
> tuples if commits are not done automatically
> acked.get(msgId.getTopicPartition()).add(msgId);
> }
> emitted.remove(msgId);
> }
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)