[ 
https://issues.apache.org/jira/browse/FLINK-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974188#comment-16974188
 ] 

Chesnay Schepler commented on FLINK-14565:
------------------------------------------

So here's the thing:

The SystemResources isn't such a special case; it is a data-structure from 
which multiple metrics are derived, which just happens to be updated regularly. 
In that sense, it is not to different from a Histogram, which creates the 
HistogramStatistics as the data-structure.

I wanted to generalize the histograms for quite a while to something I refer to 
as "complex metrics". Now we don't have time to do this properly for this 
release, but we could introduce a light variant that we can at least use 
internally.
Wwe can handle this entirely within the internal parts, not exposing anything 
to the public API.

The metric system already supports updating metrics in the background through 
the {{View}} interface. This is for example used to calculate the rate for 
meters.
What is missing is the concept of a metric that consists of multiple metrics:

Implementation steps:

1) Introduce this interface to flink-runtime:
{code}
public interface ComplexMetric extends Metric, View {
        Collection<Tuple2<String, Metric>> getMetrics();
}
{code}

2) Modify the SystemResourcesMetricsInitializer to work on AbstractMetricGroups 
instead, so we can access internal methods.

3) Expose AbstractMetricGroup#addMetric

4) Separate the SystemResourcesCounter into cpu/network specific metrics

5) Modify the SystemResourcesCounter to implement this interface.
getMetrics() returns a collection of tuples containing the name and metric 
object.
View#update() has to make some time checks to ensure the probe interval is 
respected.

6) Modify MetricRegistryImpl to check for the ComplexMetrics type of the metric 
in register/unregister. If found, iterate over contained metrics and notify 
about these instead.
Example for notifyAddedMetric:
{code}
if (metric instanceof ComplexMetric) {
        ComplexMetric complexMetric = (ComplexMetric) metric;
        for (Tuple2<String, Metric> subMetric : complexMetric.getMetrics()) {
                reporter.notifyOfAddedMetric(subMetric.f1, subMetric.f0, front);
        }
} else {
        reporter.notifyOfAddedMetric(metric, metricName, front);
}
{code}

7) Pass the SystemResourcesCounter instances to AMG#addMetric.


> Shutdown SystemResourcesCounter on (JM|TM)MetricGroup closed
> ------------------------------------------------------------
>
>                 Key: FLINK-14565
>                 URL: https://issues.apache.org/jira/browse/FLINK-14565
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Metrics
>            Reporter: Zili Chen
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, we start SystemResourcesCounter when initialize 
> (JM|TM)MetricGroup. This thread doesn't exit on (JM|TM)MetricGroup closed and 
> even there is not exit logic of them.
> It possibly causes thread leak. For example, on our platform which supports 
> previewing sample SQL execution, it starts a MiniCluster in the same process 
> as the platform. When the preview job finished MiniCluster closed and also 
> (JM|TM)MetricGroup. However these SystemResourcesCounter threads remain.
> I propose when creating SystemResourcesCounter, track it in 
> (JM|TM)MetricGroup, and on (JM|TM)MetricGroup closed, shutdown 
> SystemResourcesCounter. This way, we survive from thread leaks.
> CC [~chesnay] [~trohrmann]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to