Nick Cuneo created STORM-2207:
---------------------------------
Summary: Kafka Spout NullPointerException during ack
Key: STORM-2207
URL: https://issues.apache.org/jira/browse/STORM-2207
Project: Apache Storm
Issue Type: Bug
Reporter: Nick Cuneo
Priority: Critical
This occurs on startup of the topology. There should be some null check
safeguards, but i'm not sure what's causing it to occur in the first place...my
guess is the topic partition is not found in the ack map.
2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
~[stormjar.jar:?]
at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
~[storm-core-1.0.2.jar:1.0.2]
... 7 more
2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
~[stormjar.jar:?]
at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
~[storm-core-1.0.2.jar:1.0.2]
... 7 more
2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765)
[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274)
[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
The method and line number in question below:
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!consumerAutoCommitMode) { // Only need to keep track of acked
tuples if commits are not done automatically
acked.get(msgId.getTopicPartition()).add(msgId);
}
emitted.remove(msgId);
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)