[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468587#comment-16468587
 ] 

ASF GitHub Bot commented on KAFKA-6870:
---------------------------------------

chia7712 opened a new pull request #4985: KAFKA-6870 Concurrency conflicts in 
SampledStat
URL: https://github.com/apache/kafka/pull/4985
 
 
   see [KAFKA-6870](https://issues.apache.org/jira/browse/KAFKA-6870) for more 
details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Concurrency conflicts in SampledStat
> ------------------------------------
>
>                 Key: KAFKA-6870
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6870
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>             Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
>         at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>         at java.util.ArrayList$Itr.next(ArrayList.java:859)
>         at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
>         at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
>         at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
>         at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
>         at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
>         at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
>         at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
>         at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to