[ 
https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33162:
-------------------------------
    Affects Version/s: 2.1.0
                           (was: 1.13.1)

> seperate the executor in DefaultDispatcherResourceManagerComponentFactory for 
> MetricFetcher and webMonitorEndpoint
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33162
>                 URL: https://issues.apache.org/jira/browse/FLINK-33162
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / REST
>    Affects Versions: 2.1.0
>            Reporter: xiaogang zhou
>            Priority: Major
>             Fix For: 2.0.0
>
>
> when starting a job with large number of taskmanagers, the jobmanager of the 
> job failed to respond to and rest request. when look into the jstack we found 
> all the 4 threads are server metrics fetcher.
> {code:java}
> // code placeholder
> "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 
> tid=0x00007f17e7823000 nid=0x246 waiting for monitor entry 
> [0x00007f178e9fe000]   java.lang.Thread.State: BLOCKED (on object monitor)    
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce80d8f0> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 
> tid=0x00007f17e88af000 nid=0x243 waiting for monitor entry 
> [0x00007f1790dfe000]   java.lang.Thread.State: BLOCKED (on object monitor)    
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce80df88> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker) 
> "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 
> tid=0x00007f1793473800 nid=0x23a runnable [0x00007f17922fd000]   
> java.lang.Thread.State: RUNNABLE at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
>    at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
>  - locked <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce811120> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-1" #76 daemon prio=5 os_prio=0 
> tid=0x00007f17a56f5000 nid=0x237 waiting for monitor entry 
> [0x00007f1792cfd000]   java.lang.Thread.State: BLOCKED (on object monitor)    
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x00000003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)     at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>      at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>    at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>    at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:      - <0x00000003ce8115f0> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker){code}
>  
> I suggest to enable a policy to reject unhandlable request executor
> {code:java}
> // code placeholder
> Executors.newScheduledThreadPool( numThreads, new 
> ExecutorThreadFactory.Builder() .setThreadPriority(threadPriority) 
> .setPoolName("Flink-" + componentName) .build());
> final MetricFetcher metricFetcher =
>         updateInterval == 0
>                 ? VoidMetricFetcher.INSTANCE
>                 : MetricFetcherImpl.fromConfiguration(
>                         configuration,
>                         metricQueryServiceRetriever,
>                         dispatcherGatewayRetriever,
>                         executor);
> webMonitorEndpoint =
>         restEndpointFactory.createRestEndpoint(
>                 configuration,
>                 dispatcherGatewayRetriever,
>                 resourceManagerGatewayRetriever,
>                 blobServer,
>                 executor,
>                 metricFetcher,
>                 
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
>                 fatalErrorHandler);
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to