I don't think beginningOffsets is null. I think it's missing one of the
partitions, which would mean the right hand side of the line is null, which
gives an NPE when we try to assign it to a primitive long.

I think this could be due to
https://issues.apache.org/jira/browse/KAFKA-7044, going by the commit
message for the fix
https://github.com/apache/kafka/commit/e2ec2d79c8d5adefc0c764583cec47144dbc5705#diff-b45245913eaae46aa847d2615d62cde0.
Specifically part 2 sounds a lot like what I think might be happening here.

"

`ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()`
assumed that endOffsets()/beginningOffsets() which eventually call
Fetcher.fetchOffsetsByTimes(), would return a map with all the topic
partitions passed to endOffsets()/beginningOffsets() and that values
are not null. Because of (1), null values were possible if some of the
topic partitions were already known (in metadata cache) and some not
(metadata cache did not have entries for some of the topic
partitions). However, even with fixing (1),
endOffsets()/beginningOffsets() may return a map with some topic
partitions missing, when list offset request returns a non-retriable
error.

"

Basically KafkaOffsetMetric also assumes that when beginningOffsets(topics)
is called, the returned map will contain a value for all requested topics.
Could you try upgrading to Kafka 2.0.1?

If necessary we can also work around this on the Storm side by skipping the
metrics if the requested partition isn't in the return values for
beginningOffsets/endOffsets. Feel free to raise an issue for this.

Den man. 12. nov. 2018 kl. 21.56 skrev Alexandre Vermeerbergen <
[email protected]>:

> Hello,
>
> Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs
> (storm-core & storm-kafka-client) taken from same Git, we get the
> following crash coming from a NullPointerException in
> KafkaOffsetMetric.getValueAndReset :
>
> 2018-11-12 19:31:30.496 o.a.s.util
> Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484)
> [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
>           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
>
> Caused by: java.lang.NullPointerException
>
>           at
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89)
> ~[stormjar.jar:?]
>
>           at
> org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at clojure.core$map$fn__4553.invoke(core.clj:2622)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$seq__4128.invoke(core.clj:137)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$filter$fn__4580.invoke(core.clj:2679)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$next__4112.invoke(core.clj:64)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
> ~[clojure-1.7.0.jar:?]
>
>           at
> clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
> ~[clojure-1.7.0.jar:?]
>
>           at
> clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$reduce.invoke(core.clj:6519)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
>
>           at
> org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           ... 7 more
>
>
> In source code, the null pointer exception comes from the following
> line of KafkaOffsetMetric.java:
>
> long earliestTimeOffset = beginningOffsets.get(topicPartition);
>
> The NullPointerException causes the crash of the worker process
> hosting the Spout, which leads to countless Netty error messages until
> the Spout is restaured on another worker.
>
> Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and
> Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that
> getting these crashes consumes a very high volume of data on a Kafka
> topic having 16 partitions.
> All this running with ORACLE Java 8 update 192 on CentOS 7.
>
> Any idea why beginningOffsets could be null ?
>
> Kind regards,
> Alexandre Vermeerbergen
>

Reply via email to