Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4840#discussion_r146824632
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
---
@@ -38,29 +43,136 @@
import static
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
import static
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
import static
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Nested data-structure to store metrics.
- *
- * <p>This structure is not thread-safe.
*/
+@ThreadSafe
public class MetricStore {
private static final Logger LOG =
LoggerFactory.getLogger(MetricStore.class);
- final JobManagerMetricStore jobManager = new JobManagerMetricStore();
- final Map<String, TaskManagerMetricStore> taskManagers = new
HashMap<>();
- final Map<String, JobMetricStore> jobs = new HashMap<>();
+ private final ComponentMetricStore jobManager = new
ComponentMetricStore();
+ private final Map<String, TaskManagerMetricStore> taskManagers = new
ConcurrentHashMap<>();
+ private final Map<String, JobMetricStore> jobs = new
ConcurrentHashMap<>();
+
+ /**
+ * Remove not active task managers.
+ *
+ * @param activeTaskManagers to retain.
+ */
+ public synchronized void retainTaskManagers(List<String>
activeTaskManagers) {
+ taskManagers.keySet().retainAll(activeTaskManagers);
+ }
+
+ /**
+ * Remove not active task managers.
+ *
+ * @param activeJobs to retain.
+ */
+ public synchronized void retainJobs(List<String> activeJobs) {
--- End diff --
can be package private
---