[
https://issues.apache.org/jira/browse/FLINK-29134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sitan Pang updated FLINK-29134:
-------------------------------
Description:
When we queryMetrics we use thread pool to process the data which are returned
by TMs.
{code:java}
private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
queryServiceGateway
.queryMetrics(timeout)
.whenCompleteAsync(
(MetricDumpSerialization.MetricSerializationResult result,
Throwable t) -> {
if (t != null) {
LOG.debug("Fetching metrics failed.", t);
} else {
metrics.addAll(deserializer.deserialize(result));
}
},
executor);
} {code}
The only condition we will fetch metrics is update time is larger than
updateInterval
{code:java}
public void update() {
synchronized (this) {
long currentTime = System.currentTimeMillis();
if (currentTime - lastUpdateTime > updateInterval) {
lastUpdateTime = currentTime;
fetchMetrics();
}
}
} {code}
Therefore, if we could not process the data in update-interval-time, metrics
data will accumulate.
Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool.
When we open ui, it maybe even worse.
{code:java}
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler); {code}
was:
When we queryMetrics we use thread pool to process the data which are returned
by TMs.
{code:java}
private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
queryServiceGateway
.queryMetrics(timeout)
.whenCompleteAsync(
(MetricDumpSerialization.MetricSerializationResult result,
Throwable t) -> {
if (t != null) {
LOG.debug("Fetching metrics failed.", t);
} else {
metrics.addAll(deserializer.deserialize(result));
}
},
executor);
} {code}
The only condition we will fetch metrics is update time is larger than
updateInterval
{code:java}
public void update() {
synchronized (this) {
long currentTime = System.currentTimeMillis();
if (currentTime - lastUpdateTime > updateInterval) {
lastUpdateTime = currentTime;
fetchMetrics();
}
}
} {code}
Therefore, if we could not process the data in update-interval-time, metrics
data will accumulate.
Besides, Rest handler and metrics share thread pool. When we open ui, it maybe
even worse.
> fetch metrics may cause oom(ThreadPool task pile up)
> ----------------------------------------------------
>
> Key: FLINK-29134
> URL: https://issues.apache.org/jira/browse/FLINK-29134
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Metrics
> Affects Versions: 1.11.0
> Reporter: Sitan Pang
> Priority: Major
> Attachments: dump-queueTask.png, dump-threadPool.png
>
>
> When we queryMetrics we use thread pool to process the data which are
> returned by TMs.
> {code:java}
> private void queryMetrics(final MetricQueryServiceGateway
> queryServiceGateway) {
> LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
> queryServiceGateway
> .queryMetrics(timeout)
> .whenCompleteAsync(
> (MetricDumpSerialization.MetricSerializationResult
> result, Throwable t) -> {
> if (t != null) {
> LOG.debug("Fetching metrics failed.", t);
> } else {
> metrics.addAll(deserializer.deserialize(result));
> }
> },
> executor);
> } {code}
> The only condition we will fetch metrics is update time is larger than
> updateInterval
> {code:java}
> public void update() {
> synchronized (this) {
> long currentTime = System.currentTimeMillis();
> if (currentTime - lastUpdateTime > updateInterval) {
> lastUpdateTime = currentTime;
> fetchMetrics();
> }
> }
> } {code}
> Therefore, if we could not process the data in update-interval-time, metrics
> data will accumulate.
> Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool.
> When we open ui, it maybe even worse.
> {code:java}
> final ScheduledExecutorService executor =
> WebMonitorEndpoint.createExecutorService(
> configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
> configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
> "DispatcherRestEndpoint");
> final long updateInterval =
> configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
> final MetricFetcher metricFetcher =
> updateInterval == 0
> ? VoidMetricFetcher.INSTANCE
> : MetricFetcherImpl.fromConfiguration(
> configuration,
> metricQueryServiceRetriever,
> dispatcherGatewayRetriever,
> executor);
> webMonitorEndpoint =
> restEndpointFactory.createRestEndpoint(
> configuration,
> dispatcherGatewayRetriever,
> resourceManagerGatewayRetriever,
> blobServer,
> executor,
> metricFetcher,
>
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
> fatalErrorHandler); {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)