Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4840#discussion_r146824651
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 ---
    @@ -38,29 +43,136 @@
     import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
     import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
     import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Nested data-structure to store metrics.
    - *
    - * <p>This structure is not thread-safe.
      */
    +@ThreadSafe
     public class MetricStore {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
     
    -   final JobManagerMetricStore jobManager = new JobManagerMetricStore();
    -   final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
    -   final Map<String, JobMetricStore> jobs = new HashMap<>();
    +   private final ComponentMetricStore jobManager = new 
ComponentMetricStore();
    +   private final Map<String, TaskManagerMetricStore> taskManagers = new 
ConcurrentHashMap<>();
    +   private final Map<String, JobMetricStore> jobs = new 
ConcurrentHashMap<>();
    +
    +   /**
    +    * Remove not active task managers.
    +    *
    +    * @param activeTaskManagers to retain.
    +    */
    +   public synchronized void retainTaskManagers(List<String> 
activeTaskManagers) {
    +           taskManagers.keySet().retainAll(activeTaskManagers);
    +   }
    +
    +   /**
    +    * Remove not active task managers.
    +    *
    +    * @param activeJobs to retain.
    +    */
    +   public synchronized void retainJobs(List<String> activeJobs) {
    +           jobs.keySet().retainAll(activeJobs);
    +   }
    +
    +   /**
    +    * Add metric dumps to the store.
    +    *
    +    * @param metricDumps to add.
    +    */
    +   public synchronized void addAll(List<MetricDump> metricDumps) {
    --- End diff --
    
    package private


---

Reply via email to