I don't think beginningOffsets is null. I think it's missing one of the partitions, which would mean the right hand side of the line is null, which gives an NPE when we try to assign it to a primitive long.
I think this could be due to https://issues.apache.org/jira/browse/KAFKA-7044, going by the commit message for the fix https://github.com/apache/kafka/commit/e2ec2d79c8d5adefc0c764583cec47144dbc5705#diff-b45245913eaae46aa847d2615d62cde0. Specifically part 2 sounds a lot like what I think might be happening here. " `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). However, even with fixing (1), endOffsets()/beginningOffsets() may return a map with some topic partitions missing, when list offset request returns a non-retriable error. " Basically KafkaOffsetMetric also assumes that when beginningOffsets(topics) is called, the returned map will contain a value for all requested topics. Could you try upgrading to Kafka 2.0.1? If necessary we can also work around this on the Storm side by skipping the metrics if the requested partition isn't in the return values for beginningOffsets/endOffsets. Feel free to raise an issue for this. Den man. 12. nov. 2018 kl. 21.56 skrev Alexandre Vermeerbergen < [email protected]>: > Hello, > > Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs > (storm-core & storm-kafka-client) taken from same Git, we get the > following crash coming from a NullPointerException in > KafkaOffsetMetric.getValueAndReset : > > 2018-11-12 19:31:30.496 o.a.s.util > Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died! > > java.lang.RuntimeException: java.lang.NullPointerException > > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484) > [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192] > > Caused by: java.lang.NullPointerException > > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89) > ~[stormjar.jar:?] > > at > org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at clojure.core$map$fn__4553.invoke(core.clj:2622) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.sval(LazySeq.java:40) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.seq(LazySeq.java:49) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > > at clojure.core$seq__4128.invoke(core.clj:137) > ~[clojure-1.7.0.jar:?] > > at clojure.core$filter$fn__4580.invoke(core.clj:2679) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.sval(LazySeq.java:40) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.seq(LazySeq.java:49) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > > at clojure.core$next__4112.invoke(core.clj:64) > ~[clojure-1.7.0.jar:?] > > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > > at > clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > > at > clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > > at clojure.core$reduce.invoke(core.clj:6519) > ~[clojure-1.7.0.jar:?] > > at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] > > at > org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > ... 7 more > > > In source code, the null pointer exception comes from the following > line of KafkaOffsetMetric.java: > > long earliestTimeOffset = beginningOffsets.get(topicPartition); > > The NullPointerException causes the crash of the worker process > hosting the Spout, which leads to countless Netty error messages until > the Spout is restaured on another worker. > > Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and > Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that > getting these crashes consumes a very high volume of data on a Kafka > topic having 16 partitions. > All this running with ORACLE Java 8 update 192 on CentOS 7. > > Any idea why beginningOffsets could be null ? > > Kind regards, > Alexandre Vermeerbergen >
