[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468572#comment-16468572
 ] 

Rajini Sivaram commented on KAFKA-6870:
---------------------------------------

I have added you as Contributor and assigned the Jira to you.

> Concurrency conflicts in SampledStat
> ------------------------------------
>
>                 Key: KAFKA-6870
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6870
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>             Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
>         at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>         at java.util.ArrayList$Itr.next(ArrayList.java:859)
>         at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
>         at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
>         at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
>         at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
>         at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
>         at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
>         at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
>         at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to