Anthony Castrati created STORM-4055:
---------------------------------------

             Summary: ConcurrentModificationException on KafkaConsumer when 
running topology with Metrics Reporters
                 Key: STORM-4055
                 URL: https://issues.apache.org/jira/browse/STORM-4055
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka-client
    Affects Versions: 2.6.1
            Reporter: Anthony Castrati


After a recent upgrade to storm-kafka-client on storm server 2.6.1, we are 
seeing ConcurrentModificationException in our topology at runtime. I believe 
this is due to the re-use of a KafkaConsumer instance between the KafkaSpout 
and the 
KafkaOffsetPartitionMetrics which were added some time between 2.4.0 and 2.6.1.
 
h3. Steps to Reproduce:

Configure a topology with a basic KafkaSpout. Configure the topology with one 
of the metrics loggers. We used our own custom one, but reproduced it with 
ConsoleStormReporter as well. The JMXReporter did not reproduce the issue for 
us, but we did not dig into why.

reporter config:
{quote}topology.metrics.reporters: [
  {
    "filter": {
      "expression": ".*",
      "class": "org.apache.storm.metrics2.filters.RegexFilter"
    },
    "report.period": 15,
    "report.period.units": "SECONDS",
    "class": "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
  }
]
{quote}
h3. Stacktrace:
{quote}[ERROR] Exception thrown from NewRelicReporter#report. Exception was 
suppressed.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access. currentThread(name: metrics-newRelicReporter-1-thread-1, 
id: 24) otherThread(id: 40)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
 ~[stormjar.jar:?]
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2465)
 ~[stormjar.jar:?]
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2144)
 ~[stormjar.jar:?]
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2123)
 ~[stormjar.jar:?]
    at 
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics.getBeginningOffsets(KafkaOffsetPartitionMetrics.java:181)
 ~[stormjar.jar:?]
    at 
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:93)
 ~[stormjar.jar:?]
    at 
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:90)
 ~[stormjar.jar:?]
    at 
com.codahale.metrics.newrelic.transformer.GaugeTransformer.transform(GaugeTransformer.java:60)
 ~[stormjar.jar:?]
    at 
com.codahale.metrics.newrelic.NewRelicReporter.lambda$transform$0(NewRelicReporter.java:154)
 ~[stormjar.jar:?]
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source) 
~[?:?]
    at 
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Unknown
 Source) ~[?:?]
    at java.base/java.util.TreeMap$EntrySpliterator.forEachRemaining(Unknown 
Source) ~[?:?]
    at 
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Unknown
 Source) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) 
~[?:?]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source) ~[?:?]
    at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown 
Source) ~[?:?]
    at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown
 Source) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) 
~[?:?]
    at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source) 
~[?:?]
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source) 
~[?:?]
    at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown 
Source) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) 
~[?:?]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source) ~[?:?]
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
Source) ~[?:?]
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) 
~[?:?]
    at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) 
~[?:?]
    at 
com.codahale.metrics.newrelic.NewRelicReporter.report(NewRelicReporter.java:138)
 ~[stormjar.jar:?]
    at 
com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:243) 
~[metrics-core-3.2.6.jar:3.2.6]
    at com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:182) 
[metrics-core-3.2.6.jar:3.2.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source) [?:?]
    at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source) 
[?:?]
    at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source) [?:?]
    at java.base/java.lang.Thread.run(Unknown Source) [?:?]
{quote}
h3. Workaround

Configure the with RegexFilter or similar that excludes the 
KafkaOffsetPartitionMetrics.
h3. Impact

I am concerned that depending on the timing of the access to the spout that the 
offending metric could fast forward or rewind the spout. I did not do any 
further testing to see if the lock could be mis-managed in such a way that the 
spout is directly impacted, but it is feasible. Impact may need to be adjusted 
if it is confirmed that a simple metric reporter could result in skipping 
events or re-processing them.
h3. Potential Code Issues:

KafkaSpout.java
{quote}private transient Consumer<K, V> consumer;
...

public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
        ...
        //this consumer will be used by the spout everywhere
        consumer = 
kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
        tupleListener.open(conf, context);
        this.kafkaOffsetMetricManager
            = new KafkaOffsetMetricManager<>(() -> 
Collections.unmodifiableMap(offsetManagers), () -> consumer, context);
{quote}
() -> consumer does not appear to be a safe provider.  It re-uses the same 
instance of the KafkaConsumer as the KafkaSpout in another thread and 
KafkaConsumer is not thread safe.

KafkaOffsetPartitionMetrics.java: getBeginningOffsets, getEndOffsets
{quote}private Map<TopicPartition, Long> 
getBeginningOffsets(Set<TopicPartition> topicPartitions) {
    Consumer<K, V> consumer = consumerSupplier.get();
    ...
    try {
        // This will actually try to modify the KafkaSpout instance of the 
consumer which could negatively impact the spout
        beginningOffsets = consumer.beginningOffsets(topicPartitions);
    }
    ...
}

private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> 
topicPartitions) {
    Consumer<K, V> consumer = consumerSupplier.get();
    ...
    try {
        // This will actually try to modify the KafkaSpout instance of the 
consumer which could negatively impact the spout
        endOffsets = consumer.endOffsets(topicPartitions);
    }
    ...
}
{quote}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to