Nick Cuneo created STORM-2207:
---------------------------------

             Summary: Kafka Spout NullPointerException during ack
                 Key: STORM-2207
                 URL: https://issues.apache.org/jira/browse/STORM-2207
             Project: Apache Storm
          Issue Type: Bug
            Reporter: Nick Cuneo
            Priority: Critical


This occurs on startup of the topology.  There should be some null check 
safeguards, but i'm not sure what's causing it to occur in the first place...my 
guess is  the topic partition is not found in the ack map.

2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) 
~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
 ~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) 
~[stormjar.jar:?]
    at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
 ~[storm-core-1.0.2.jar:1.0.2]
    ... 7 more
2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR] 
java.lang.RuntimeException: java.lang.NullPointerException
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) 
~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
 ~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) 
~[stormjar.jar:?]
    at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
 ~[storm-core-1.0.2.jar:1.0.2]
    ... 7 more
2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765) 
[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274)
 [storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]

The method and line number in question below:

@Override
    public void ack(Object messageId) {
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
        if (!consumerAutoCommitMode) {  // Only need to keep track of acked 
tuples if commits are not done automatically
            acked.get(msgId.getTopicPartition()).add(msgId);
        }
        emitted.remove(msgId);
}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to