[
https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruan Hang updated FLINK-33162:
------------------------------
Fix Version/s: 2.3.0
(was: 2.2.0)
> seperate the executor in DefaultDispatcherResourceManagerComponentFactory for
> MetricFetcher and webMonitorEndpoint
> ------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33162
> URL: https://issues.apache.org/jira/browse/FLINK-33162
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / REST
> Affects Versions: 2.1.0
> Reporter: xiaogang zhou
> Priority: Major
> Fix For: 2.3.0
>
>
> when starting a job with large number of taskmanagers, the jobmanager of the
> job failed to respond to and rest request. when look into the jstack we found
> all the 4 threads are server metrics fetcher.
> {code:java}
> // code placeholder
> "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0
> tid=0x00007f17e7823000 nid=0x246 waiting for monitor entry
> [0x00007f178e9fe000] java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
> - waiting to lock <0x00000003d5f62638> (a
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
> Source) at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers: - <0x00000003ce80d8f0> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0
> tid=0x00007f17e88af000 nid=0x243 waiting for monitor entry
> [0x00007f1790dfe000] java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
> - waiting to lock <0x00000003d5f62638> (a
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
> Source) at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers: - <0x00000003ce80df88> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0
> tid=0x00007f1793473800 nid=0x23a runnable [0x00007f17922fd000]
> java.lang.Thread.State: RUNNABLE at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
> - locked <0x00000003d5f62638> (a
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
> Source) at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers: - <0x00000003ce811120> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-1" #76 daemon prio=5 os_prio=0
> tid=0x00007f17a56f5000 nid=0x237 waiting for monitor entry
> [0x00007f1792cfd000] java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
> - waiting to lock <0x00000003d5f62638> (a
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
> at
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
> Source) at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers: - <0x00000003ce8115f0> (a
> java.util.concurrent.ThreadPoolExecutor$Worker){code}
>
> I suggest to enable a policy to reject unhandlable request executor
> {code:java}
> // code placeholder
> Executors.newScheduledThreadPool( numThreads, new
> ExecutorThreadFactory.Builder() .setThreadPriority(threadPriority)
> .setPoolName("Flink-" + componentName) .build());
> final MetricFetcher metricFetcher =
> updateInterval == 0
> ? VoidMetricFetcher.INSTANCE
> : MetricFetcherImpl.fromConfiguration(
> configuration,
> metricQueryServiceRetriever,
> dispatcherGatewayRetriever,
> executor);
> webMonitorEndpoint =
> restEndpointFactory.createRestEndpoint(
> configuration,
> dispatcherGatewayRetriever,
> resourceManagerGatewayRetriever,
> blobServer,
> executor,
> metricFetcher,
>
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
> fatalErrorHandler);
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)