[
https://issues.apache.org/jira/browse/STORM-4055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Abreu updated STORM-4055:
-----------------------------
Fix Version/s: 2.7.0
(was: 2.7.1)
> ConcurrentModificationException on KafkaConsumer when running topology with
> Metrics Reporters
> ---------------------------------------------------------------------------------------------
>
> Key: STORM-4055
> URL: https://issues.apache.org/jira/browse/STORM-4055
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 2.6.1
> Reporter: Anthony Castrati
> Priority: Major
> Fix For: 2.7.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> After a recent upgrade to storm-kafka-client on storm server 2.6.1, we are
> seeing ConcurrentModificationException in our topology at runtime. I believe
> this is due to the re-use of a KafkaConsumer instance between the KafkaSpout
> and the
> KafkaOffsetPartitionMetrics which were added some time between 2.4.0 and
> 2.6.1.
>
> h3. Steps to Reproduce:
> Configure a topology with a basic KafkaSpout. Configure the topology with one
> of the metrics loggers. We used our own custom one, but reproduced it with
> ConsoleStormReporter as well. The JMXReporter did not reproduce the issue for
> us, but we did not dig into why.
> *reporter config:*
> {{topology.metrics.reporters: [}}
> {{ {}}
> {{ "filter": {}}
> {{ "expression": ".*",}}
> {{ "class": "org.apache.storm.metrics2.filters.RegexFilter"}}
> {{ },}}
> {{ "report.period": 15,}}
> {{ "report.period.units": "SECONDS",}}
> {{ "class": "org.apache.storm.metrics2.reporters.ConsoleStormReporter"}}
> }
> {{]}}
> h3. Stacktrace:
> {quote}[ERROR] Exception thrown from NewRelicReporter#report. Exception was
> suppressed.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access. currentThread(name:
> metrics-newRelicReporter-1-thread-1, id: 24) otherThread(id: 40)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
> ~[stormjar.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2465)
> ~[stormjar.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2144)
> ~[stormjar.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2123)
> ~[stormjar.jar:?]
> at
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics.getBeginningOffsets(KafkaOffsetPartitionMetrics.java:181)
> ~[stormjar.jar:?]
> at
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:93)
> ~[stormjar.jar:?]
> at
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:90)
> ~[stormjar.jar:?]
> at
> com.codahale.metrics.newrelic.transformer.GaugeTransformer.transform(GaugeTransformer.java:60)
> ~[stormjar.jar:?]
> at
> com.codahale.metrics.newrelic.NewRelicReporter.lambda$transform$0(NewRelicReporter.java:154)
> ~[stormjar.jar:?]
> at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Unknown
> Source) ~[?:?]
> at java.base/java.util.TreeMap$EntrySpliterator.forEachRemaining(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
> ~[?:?]
> at
> com.codahale.metrics.newrelic.NewRelicReporter.report(NewRelicReporter.java:138)
> ~[stormjar.jar:?]
> at
> com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:243)
> ~[metrics-core-3.2.6.jar:3.2.6]
> at
> com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:182)
> [metrics-core-3.2.6.jar:3.2.6]
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source) [?:?]
> at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
> [?:?]
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) [?:?]
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.base/java.lang.Thread.run(Unknown Source) [?:?]
> {quote}
> h3. Workaround
> Configure the with RegexFilter or similar that excludes the
> KafkaOffsetPartitionMetrics.
> h3. Impact
> I am concerned that depending on the timing of the access to the spout that
> the offending metric could fast forward or rewind the spout. I did not do any
> further testing to see if the lock could be mis-managed in such a way that
> the spout is directly impacted, but it is feasible. Impact may need to be
> adjusted if it is confirmed that a simple metric reporter could result in
> skipping events or re-processing them.
> h3. Potential Code Issues:
> *KafkaSpout.java*
> {{private transient Consumer<K, V> consumer;}}
> {{...}}
> {{public void open(Map<String, Object> conf, TopologyContext context,
> SpoutOutputCollector collector) {}}
> {{ ...}}
> {{ //this consumer will be used by the spout everywhere}}
> {{ consumer =
> kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());}}
> {{ tupleListener.open(conf, context);}}
> {{ this.kafkaOffsetMetricManager}}
> {{ = new KafkaOffsetMetricManager<>(() ->
> Collections.unmodifiableMap(offsetManagers), () -> consumer, context);}}
> () -> consumer does not appear to be a safe provider. It re-uses the same
> instance of the KafkaConsumer as the KafkaSpout in another thread and
> KafkaConsumer is not thread safe.
> {*}KafkaOffsetPartitionMetrics.java: getBeginningOffsets,
> getEndOffsets{*}{{{{}}{}}}
>
>
> {{private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition>
> topicPartitions) {}}
> {{ Consumer<K, V> consumer = consumerSupplier.get();}}
> {{ ...}}
> {{ try {}}
> {{ // This will actually try to modify the KafkaSpout instance of the
> consumer which could negatively impact the spout}}
> {{ beginningOffsets = consumer.beginningOffsets(topicPartitions);}}
> {\\{ }}}
> {{ ...}}
> {{{}}{}}}{{{}private Map<TopicPartition, Long>
> getEndOffsets(Set<TopicPartition> topicPartitions) {{}}}
> {{ Consumer<K, V> consumer = consumerSupplier.get();}}
> {{ ...}}
> {{ try {}}
> {{ // This will actually try to modify the KafkaSpout instance of the
> consumer which could negatively impact the spout}}
> {{{} endOffsets = consumer.endOffsets(topicPartitions);{}}}{\{ }
> }}
> {{ ...}}
> {{}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)