[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453095#comment-16453095
 ] 

Chesnay Schepler commented on FLINK-9258:
-----------------------------------------

This should only be possible if {{getAllVariables}} is called concurrently by 
multiple threads. The problem is that the internal map is assigned to the field 
before it is populated with data, at which point it is already accessable by 
other threads.

> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> -----------------------------------------------------------------------
>
>                 Key: FLINK-9258
>                 URL: https://issues.apache.org/jira/browse/FLINK-9258
>             Project: Flink
>          Issue Type: Bug
>          Components: Metrics
>    Affects Versions: 1.4.2
>            Reporter: Narayanan Arunachalam
>            Assignee: Chesnay Schepler
>            Priority: Major
>             Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>         at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>         at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>         at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>         at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>         at java.util.HashMap.putMapEntries(HashMap.java:511)
>         at java.util.HashMap.putAll(HashMap.java:784)
>         at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>         at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>         at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>         at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>         at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>         at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to