[ 
https://issues.apache.org/jira/browse/FLINK-18663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162028#comment-17162028
 ] 

tartarus commented on FLINK-18663:
----------------------------------

[~chesnay] thanks for your reply.

 

This problem does not occur frequently, but when the JM load is too high, such 
as GC,call /jobs/overview maybe happen exception
{code:java}
2020-07-21 12:39:21,458 WARN  
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
  - ##### requestProcessingFuture url = /jobs/overview, throwable
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on [Actor[akka://flink/user/dispatcher#-919763123]] after [10000 ms]. 
Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A 
typical reason for `AskTimeoutException` is that
 the recipient actor didn't send a reply.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:878)
        at akka.dispatch.OnComplete.internal(Future.scala:263)
        at akka.dispatch.OnComplete.internal(Future.scala:261)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
        at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
        at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
        at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
        at java.lang.Thread.run(Thread.java:745)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#-919763123]] after [10000 ms]. Message of 
type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't sen
d a reply.
        at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
        at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
        at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
        ... 9 more
{code}
there seem to be few changes to netty, but FLINK-16626.

We found that the problem was indeed when canceling the flink job.

there are some debug info,  Handling exceptions under normal circumstances 
!110.png!

When abnormal

!111.png!

 

 

> Fix Flink On YARN AM not exit
> -----------------------------
>
>                 Key: FLINK-18663
>                 URL: https://issues.apache.org/jira/browse/FLINK-18663
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.10.0, 1.10.1, 1.11.0
>            Reporter: tartarus
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: 110.png, 111.png, 
> C49A7310-F932-451B-A203-6D17F3140C0D.png, e18e00dd6664485c2ff55284fe969474.png
>
>
> AbstractHandler throw NPE cause by FlinkHttpObjectAggregator is null
> when rest throw exception, it will do this code
> {code:java}
> private CompletableFuture<Void> handleException(Throwable throwable, 
> ChannelHandlerContext ctx, HttpRequest httpRequest) {
>       FlinkHttpObjectAggregator flinkHttpObjectAggregator = 
> ctx.pipeline().get(FlinkHttpObjectAggregator.class);
>       int maxLength = flinkHttpObjectAggregator.maxContentLength() - 
> OTHER_RESP_PAYLOAD_OVERHEAD;
>       if (throwable instanceof RestHandlerException) {
>               RestHandlerException rhe = (RestHandlerException) throwable;
>               String stackTrace = ExceptionUtils.stringifyException(rhe);
>               String truncatedStackTrace = Ascii.truncate(stackTrace, 
> maxLength, "...");
>               if (log.isDebugEnabled()) {
>                       log.error("Exception occurred in REST handler.", rhe);
>               } else {
>                       log.error("Exception occurred in REST handler: {}", 
> rhe.getMessage());
>               }
>               return HandlerUtils.sendErrorResponse(
>                       ctx,
>                       httpRequest,
>                       new ErrorResponseBody(truncatedStackTrace),
>                       rhe.getHttpResponseStatus(),
>                       responseHeaders);
>       } else {
>               log.error("Unhandled exception.", throwable);
>               String stackTrace = String.format("<Exception on server 
> side:%n%s%nEnd of exception on server side>",
>                       ExceptionUtils.stringifyException(throwable));
>               String truncatedStackTrace = Ascii.truncate(stackTrace, 
> maxLength, "...");
>               return HandlerUtils.sendErrorResponse(
>                       ctx,
>                       httpRequest,
>                       new ErrorResponseBody(Arrays.asList("Internal server 
> error.", truncatedStackTrace)),
>                       HttpResponseStatus.INTERNAL_SERVER_ERROR,
>                       responseHeaders);
>       }
> }
> {code}
> but flinkHttpObjectAggregator some case is null,so this will throw NPE,but 
> this method called by  AbstractHandler#respondAsLeader
> {code:java}
> requestProcessingFuture
>       .whenComplete((Void ignored, Throwable throwable) -> {
>               if (throwable != null) {
>                       
> handleException(ExceptionUtils.stripCompletionException(throwable), ctx, 
> httpRequest)
>                               .whenComplete((Void ignored2, Throwable 
> throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
>               } else {
>                       finalizeRequestProcessing(finalUploadedFiles);
>               }
>       });
> {code}
>  the result is InFlightRequestTracker Cannot be cleared.
> so the CompletableFuture does‘t complete that handler's closeAsync returned
> !C49A7310-F932-451B-A203-6D17F3140C0D.png!
> !e18e00dd6664485c2ff55284fe969474.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to