[ https://issues.apache.org/jira/browse/FLINK-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974188#comment-16974188 ]
Chesnay Schepler commented on FLINK-14565: ------------------------------------------ So here's the thing: The SystemResources isn't such a special case; it is a data-structure from which multiple metrics are derived, which just happens to be updated regularly. In that sense, it is not to different from a Histogram, which creates the HistogramStatistics as the data-structure. I wanted to generalize the histograms for quite a while to something I refer to as "complex metrics". Now we don't have time to do this properly for this release, but we could introduce a light variant that we can at least use internally. Wwe can handle this entirely within the internal parts, not exposing anything to the public API. The metric system already supports updating metrics in the background through the {{View}} interface. This is for example used to calculate the rate for meters. What is missing is the concept of a metric that consists of multiple metrics: Implementation steps: 1) Introduce this interface to flink-runtime: {code} public interface ComplexMetric extends Metric, View { Collection<Tuple2<String, Metric>> getMetrics(); } {code} 2) Modify the SystemResourcesMetricsInitializer to work on AbstractMetricGroups instead, so we can access internal methods. 3) Expose AbstractMetricGroup#addMetric 4) Separate the SystemResourcesCounter into cpu/network specific metrics 5) Modify the SystemResourcesCounter to implement this interface. getMetrics() returns a collection of tuples containing the name and metric object. View#update() has to make some time checks to ensure the probe interval is respected. 6) Modify MetricRegistryImpl to check for the ComplexMetrics type of the metric in register/unregister. If found, iterate over contained metrics and notify about these instead. Example for notifyAddedMetric: {code} if (metric instanceof ComplexMetric) { ComplexMetric complexMetric = (ComplexMetric) metric; for (Tuple2<String, Metric> subMetric : complexMetric.getMetrics()) { reporter.notifyOfAddedMetric(subMetric.f1, subMetric.f0, front); } } else { reporter.notifyOfAddedMetric(metric, metricName, front); } {code} 7) Pass the SystemResourcesCounter instances to AMG#addMetric. > Shutdown SystemResourcesCounter on (JM|TM)MetricGroup closed > ------------------------------------------------------------ > > Key: FLINK-14565 > URL: https://issues.apache.org/jira/browse/FLINK-14565 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics > Reporter: Zili Chen > Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Currently, we start SystemResourcesCounter when initialize > (JM|TM)MetricGroup. This thread doesn't exit on (JM|TM)MetricGroup closed and > even there is not exit logic of them. > It possibly causes thread leak. For example, on our platform which supports > previewing sample SQL execution, it starts a MiniCluster in the same process > as the platform. When the preview job finished MiniCluster closed and also > (JM|TM)MetricGroup. However these SystemResourcesCounter threads remain. > I propose when creating SystemResourcesCounter, track it in > (JM|TM)MetricGroup, and on (JM|TM)MetricGroup closed, shutdown > SystemResourcesCounter. This way, we survive from thread leaks. > CC [~chesnay] [~trohrmann] -- This message was sent by Atlassian Jira (v8.3.4#803005)