Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4840#discussion_r146552100
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
---
@@ -38,29 +43,136 @@
import static
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
import static
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
import static
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Nested data-structure to store metrics.
- *
- * <p>This structure is not thread-safe.
*/
+@ThreadSafe
public class MetricStore {
private static final Logger LOG =
LoggerFactory.getLogger(MetricStore.class);
- final JobManagerMetricStore jobManager = new JobManagerMetricStore();
- final Map<String, TaskManagerMetricStore> taskManagers = new
HashMap<>();
- final Map<String, JobMetricStore> jobs = new HashMap<>();
+ private final ComponentMetricStore jobManager = new
ComponentMetricStore();
+ private final Map<String, TaskManagerMetricStore> taskManagers = new
ConcurrentHashMap<>();
+ private final Map<String, JobMetricStore> jobs = new
ConcurrentHashMap<>();
+
+ /**
+ * Remove not active task managers.
+ *
+ * @param activeTaskManagers to retain.
+ */
+ public synchronized void retainTaskManagers(List<String>
activeTaskManagers) {
--- End diff --
but now we're paying the synchronization costs twice, once for the
synchronized keyword and once again for every access to the map. If every
method is synchronized the maps don't have to be concurrent hash maps.
---