[
https://issues.apache.org/jira/browse/FLINK-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974188#comment-16974188
]
Chesnay Schepler commented on FLINK-14565:
------------------------------------------
So here's the thing:
The SystemResources isn't such a special case; it is a data-structure from
which multiple metrics are derived, which just happens to be updated regularly.
In that sense, it is not to different from a Histogram, which creates the
HistogramStatistics as the data-structure.
I wanted to generalize the histograms for quite a while to something I refer to
as "complex metrics". Now we don't have time to do this properly for this
release, but we could introduce a light variant that we can at least use
internally.
Wwe can handle this entirely within the internal parts, not exposing anything
to the public API.
The metric system already supports updating metrics in the background through
the {{View}} interface. This is for example used to calculate the rate for
meters.
What is missing is the concept of a metric that consists of multiple metrics:
Implementation steps:
1) Introduce this interface to flink-runtime:
{code}
public interface ComplexMetric extends Metric, View {
Collection<Tuple2<String, Metric>> getMetrics();
}
{code}
2) Modify the SystemResourcesMetricsInitializer to work on AbstractMetricGroups
instead, so we can access internal methods.
3) Expose AbstractMetricGroup#addMetric
4) Separate the SystemResourcesCounter into cpu/network specific metrics
5) Modify the SystemResourcesCounter to implement this interface.
getMetrics() returns a collection of tuples containing the name and metric
object.
View#update() has to make some time checks to ensure the probe interval is
respected.
6) Modify MetricRegistryImpl to check for the ComplexMetrics type of the metric
in register/unregister. If found, iterate over contained metrics and notify
about these instead.
Example for notifyAddedMetric:
{code}
if (metric instanceof ComplexMetric) {
ComplexMetric complexMetric = (ComplexMetric) metric;
for (Tuple2<String, Metric> subMetric : complexMetric.getMetrics()) {
reporter.notifyOfAddedMetric(subMetric.f1, subMetric.f0, front);
}
} else {
reporter.notifyOfAddedMetric(metric, metricName, front);
}
{code}
7) Pass the SystemResourcesCounter instances to AMG#addMetric.
> Shutdown SystemResourcesCounter on (JM|TM)MetricGroup closed
> ------------------------------------------------------------
>
> Key: FLINK-14565
> URL: https://issues.apache.org/jira/browse/FLINK-14565
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Metrics
> Reporter: Zili Chen
> Priority: Major
> Labels: pull-request-available
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Currently, we start SystemResourcesCounter when initialize
> (JM|TM)MetricGroup. This thread doesn't exit on (JM|TM)MetricGroup closed and
> even there is not exit logic of them.
> It possibly causes thread leak. For example, on our platform which supports
> previewing sample SQL execution, it starts a MiniCluster in the same process
> as the platform. When the preview job finished MiniCluster closed and also
> (JM|TM)MetricGroup. However these SystemResourcesCounter threads remain.
> I propose when creating SystemResourcesCounter, track it in
> (JM|TM)MetricGroup, and on (JM|TM)MetricGroup closed, shutdown
> SystemResourcesCounter. This way, we survive from thread leaks.
> CC [~chesnay] [~trohrmann]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)