[
https://issues.apache.org/jira/browse/FLINK-29134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603438#comment-17603438
]
Sitan Pang commented on FLINK-29134:
------------------------------------
Hi, [~chesnay]. Any thoughts on this issue?
> fetch metrics may cause oom(ThreadPool task pile up)
> ----------------------------------------------------
>
> Key: FLINK-29134
> URL: https://issues.apache.org/jira/browse/FLINK-29134
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Metrics
> Affects Versions: 1.11.0, 1.15.2
> Reporter: Sitan Pang
> Priority: Major
> Attachments: dump-queueTask.png, dump-threadPool.png
>
>
> When we queryMetrics we use thread pool to process the data which are
> returned by TMs.
> {code:java}
> private void queryMetrics(final MetricQueryServiceGateway
> queryServiceGateway) {
> LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
> queryServiceGateway
> .queryMetrics(timeout)
> .whenCompleteAsync(
> (MetricDumpSerialization.MetricSerializationResult
> result, Throwable t) -> {
> if (t != null) {
> LOG.debug("Fetching metrics failed.", t);
> } else {
> metrics.addAll(deserializer.deserialize(result));
> }
> },
> executor);
> } {code}
> The only condition we will fetch metrics is update time is larger than
> updateInterval
> {code:java}
> public void update() {
> synchronized (this) {
> long currentTime = System.currentTimeMillis();
> if (currentTime - lastUpdateTime > updateInterval) {
> lastUpdateTime = currentTime;
> fetchMetrics();
> }
> }
> } {code}
> Therefore, if we could not process the data in update-interval-time, metrics
> data will accumulate.
> Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool.
> When we open ui, it maybe even worse.
> {code:java}
> final ScheduledExecutorService executor =
> WebMonitorEndpoint.createExecutorService(
> configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
> configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
> "DispatcherRestEndpoint");
> final long updateInterval =
> configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
> final MetricFetcher metricFetcher =
> updateInterval == 0
> ? VoidMetricFetcher.INSTANCE
> : MetricFetcherImpl.fromConfiguration(
> configuration,
> metricQueryServiceRetriever,
> dispatcherGatewayRetriever,
> executor);
> webMonitorEndpoint =
> restEndpointFactory.createRestEndpoint(
> configuration,
> dispatcherGatewayRetriever,
> resourceManagerGatewayRetriever,
> blobServer,
> executor,
> metricFetcher,
>
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
> fatalErrorHandler); {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)