[ 
https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218388#comment-16218388
 ] 

ASF GitHub Bot commented on FLINK-7368:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146819586
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 ---
    @@ -260,50 +293,66 @@ public String getMetric(String name, String 
defaultValue) {
                                ? value
                                : defaultValue;
                }
    -   }
     
    -   /**
    -    * Sub-structure containing metrics of the JobManager.
    -    */
    -   public static class JobManagerMetricStore extends ComponentMetricStore {
    +           public static ComponentMetricStore 
unmodifiable(ComponentMetricStore source) {
    +                   if (source == null) {
    +                           return null;
    +                   }
    +                   return new 
ComponentMetricStore(unmodifiableMap(source.metrics));
    +           }
        }
     
        /**
         * Sub-structure containing metrics of a single TaskManager.
         */
    +   @ThreadSafe
        public static class TaskManagerMetricStore extends ComponentMetricStore 
{
    -           public final Set<String> garbageCollectorNames = new 
HashSet<>();
    +           public final Set<String> garbageCollectorNames;
    +
    +           public TaskManagerMetricStore() {
    +                   this(new ConcurrentHashMap<>(), 
ConcurrentHashMap.newKeySet());
    +           }
    +
    +           public TaskManagerMetricStore(Map<String, String> metrics, 
Set<String> garbageCollectorNames) {
    +                   super(metrics);
    +                   this.garbageCollectorNames = 
checkNotNull(garbageCollectorNames);
    +           }
     
                public void addGarbageCollectorName(String name) {
                        garbageCollectorNames.add(name);
                }
    +
    +           public static TaskManagerMetricStore 
unmodifiable(TaskManagerMetricStore source) {
    --- End diff --
    
    Then you pay costs of copying all of the objects, which is much higher 
compared to the synchronisation. But yes, you can do that. In that case you 
don't need concurrent hash maps. 
    
    If you intend to go in other direction with this and leave the current code 
in the master as it is, please log the work in Jira for this after the release 
(I will close release blocker issue for which I started this PR). However I 
have an impression that any follow up work would be easier on top of this 
change and in the mean time this is still way better then current code on 
master branch with this external synchronisation.  


> MetricStore makes cpu spin at 100%
> ----------------------------------
>
>                 Key: FLINK-7368
>                 URL: https://issues.apache.org/jira/browse/FLINK-7368
>             Project: Flink
>          Issue Type: Bug
>          Components: Metrics
>            Reporter: Nico Chen
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>         Attachments: MyHashMap.java, MyHashMapInfiniteLoopTest.java, 
> jm-jstack.log
>
>
> Flink's `MetricStore` is not thread-safe. multi-treads may acess java' 
> hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. 
> Recently I met the case that flink jobmanager consumed 100% cpu. A part of 
> stacktrace is shown below. The full jstack is in the attachment.
> {code:java}
> "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x00007fbdacac9800 nid=0x64c1 
> runnable [0x00007fbd7d1c2000]
>    java.lang.Thread.State: RUNNABLE
>         at java.util.HashMap.put(HashMap.java:494)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
>         at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
>         at akka.dispatch.OnSuccess.internal(Future.scala:212)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at 
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>         at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
>         at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at 
> java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
>         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
>         at 
> java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
>         at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
>         at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
>         at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
> {code}
> There are 24 threads show same stacktrace as above to indicate they are 
> spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many 
> posts indicate multi-threads accessing hashmap cause this problem and I 
> reproduce the case as well. The test code is attached. I only modify the 
> HashMap.transfer() by adding concurrent barriers for different treads in 
> order to simulate the timing of creation of cycles in hashmap's Entry.  My 
> program's stacktrace shows it hangs at same line of 
> HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above.
>  Even through `MetricFetcher` has a 10 seconds minimum inteverl between each 
> metrics qurey, it still cannot guarntee query responses do not acess 
> `MtricStore`'s hashmap concurrently.  Thus I think it's a bug to fix.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to