[
https://issues.apache.org/jira/browse/FLINK-18663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166125#comment-17166125
]
Till Rohrmann edited comment on FLINK-18663 at 7/28/20, 8:37 AM:
-----------------------------------------------------------------
[~trohrmann] the JobManager's log is too large,so I remove some yarn and
container’s log.
{{AbstractHandler.terminationFuture}} has not complete, because
AbstractHandler.inFlightRequestTracker Cannot be cleared.
[^jobmanager.log.noyarn.tar.gz]
I add the log you tell me , you can filter '#####' , the {{log.info("Shutting
RestServerEndpoint down internally")}} dosen't happen ,because
{{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close yet.
and {{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close
because {{AbstractHandler.inFlightRequestTracker}} not cleared, because the
exception on job from SCHEDULED to DEPLOYING
{code:java}
2020-07-27 21:57:26,685 ERROR
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
- ##### handle exception for url /jobs/overview
2020-07-27 21:57:26,685 ERROR
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
- ##### handle exception for url /jobs/overview
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-88418157]] after [10000 ms]. Message of
type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:745)
2020-07-27 21:57:26,686 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[flink-akka.actor.default-dispatcher-48] - 1-1.1_Sink: Unnamed (533/1500)
(1b0945713f48026b5c677a2d1559f78f) switched from SCHEDULED to DEPLOYING.
2020-07-27 21:57:26,686 ERROR
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
- ##### handleException happened exceptionjava.lang.NullPointerException
at
org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:204)
at
org.apache.flink.runtime.rest.handler.AbstractHandler.lambda$respondAsLeader$1(AbstractHandler.java:182)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:872)
at akka.dispatch.OnComplete.internal(Future.scala:263)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:745)
{code}
I am not very clear why {{FlinkHttpObjectAggregator}} was null,but,
{{FlinkHttpObjectAggregator here}} is only used to obtain maxContentLength, so
why not pass {{RestHandlerConfiguration}} or
{{RestServerEndpointConfiguration}} as construction parameters to
AbstractHandler, then we can obtain some other config too, I feel more flexible
like this.
But this is very redundant, after all, most of the parameters are not used.
was (Author: tartarus):
[~trohrmann] the JobManager's log is too large,so I remove some yarn and
container’s log.
{{AbstractHandler.terminationFuture}} has not complete, because
AbstractHandler.inFlightRequestTracker Cannot be cleared.
[^jobmanager.log.noyarn.tar.gz]
I add the log you tell me , you can filter '#####' , the {{log.info("Shutting
RestServerEndpoint down internally")}} dosen't happen ,because
{{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close yet.
and {{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close
because {{AbstractHandler.inFlightRequestTracker}} not cleared, because the
exception on job from SCHEDULED to DEPLOYING
{code:java}
2020-07-27 21:57:26,685 ERROR
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
- ##### handle exception for url /jobs/overview
2020-07-27 21:57:26,685 ERROR
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
- ##### handle exception for url
/jobs/overviewakka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-88418157]] after [10000 ms]. Message of
type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:745)
2020-07-27 21:57:26,686 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[flink-akka.actor.default-dispatcher-48] - 1-1.1_Sink: Unnamed (533/1500)
(1b0945713f48026b5c677a2d1559f78f) switched from SCHEDULED to DEPLOYING.
2020-07-27 21:57:26,686 ERROR
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
- ##### handleException happened exceptionjava.lang.NullPointerException
at
org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:204)
at
org.apache.flink.runtime.rest.handler.AbstractHandler.lambda$respondAsLeader$1(AbstractHandler.java:182)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:872)
at akka.dispatch.OnComplete.internal(Future.scala:263)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:745)
{code}
I am not very clear why {{FlinkHttpObjectAggregator}} was null,but,
{{FlinkHttpObjectAggregator here}} is only used to obtain maxContentLength, so
why not pass {{RestHandlerConfiguration}} or
{{RestServerEndpointConfiguration}} as construction parameters to
AbstractHandler, then we can obtain some other config too, I feel more flexible
like this.
But this is very redundant, after all, most of the parameters are not used.
> Fix Flink On YARN AM not exit
> -----------------------------
>
> Key: FLINK-18663
> URL: https://issues.apache.org/jira/browse/FLINK-18663
> Project: Flink
> Issue Type: Bug
> Components: Runtime / REST
> Affects Versions: 1.10.0, 1.10.1, 1.11.0
> Reporter: tartarus
> Assignee: tartarus
> Priority: Critical
> Labels: pull-request-available
> Attachments: 110.png, 111.png,
> C49A7310-F932-451B-A203-6D17F3140C0D.png,
> e18e00dd6664485c2ff55284fe969474.png, jobmanager.log.noyarn.tar.gz
>
>
> AbstractHandler throw NPE cause by FlinkHttpObjectAggregator is null
> when rest throw exception, it will do this code
> {code:java}
> private CompletableFuture<Void> handleException(Throwable throwable,
> ChannelHandlerContext ctx, HttpRequest httpRequest) {
> FlinkHttpObjectAggregator flinkHttpObjectAggregator =
> ctx.pipeline().get(FlinkHttpObjectAggregator.class);
> int maxLength = flinkHttpObjectAggregator.maxContentLength() -
> OTHER_RESP_PAYLOAD_OVERHEAD;
> if (throwable instanceof RestHandlerException) {
> RestHandlerException rhe = (RestHandlerException) throwable;
> String stackTrace = ExceptionUtils.stringifyException(rhe);
> String truncatedStackTrace = Ascii.truncate(stackTrace,
> maxLength, "...");
> if (log.isDebugEnabled()) {
> log.error("Exception occurred in REST handler.", rhe);
> } else {
> log.error("Exception occurred in REST handler: {}",
> rhe.getMessage());
> }
> return HandlerUtils.sendErrorResponse(
> ctx,
> httpRequest,
> new ErrorResponseBody(truncatedStackTrace),
> rhe.getHttpResponseStatus(),
> responseHeaders);
> } else {
> log.error("Unhandled exception.", throwable);
> String stackTrace = String.format("<Exception on server
> side:%n%s%nEnd of exception on server side>",
> ExceptionUtils.stringifyException(throwable));
> String truncatedStackTrace = Ascii.truncate(stackTrace,
> maxLength, "...");
> return HandlerUtils.sendErrorResponse(
> ctx,
> httpRequest,
> new ErrorResponseBody(Arrays.asList("Internal server
> error.", truncatedStackTrace)),
> HttpResponseStatus.INTERNAL_SERVER_ERROR,
> responseHeaders);
> }
> }
> {code}
> but flinkHttpObjectAggregator some case is null,so this will throw NPE,but
> this method called by AbstractHandler#respondAsLeader
> {code:java}
> requestProcessingFuture
> .whenComplete((Void ignored, Throwable throwable) -> {
> if (throwable != null) {
>
> handleException(ExceptionUtils.stripCompletionException(throwable), ctx,
> httpRequest)
> .whenComplete((Void ignored2, Throwable
> throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
> } else {
> finalizeRequestProcessing(finalUploadedFiles);
> }
> });
> {code}
> the result is InFlightRequestTracker Cannot be cleared.
> so the CompletableFuture does‘t complete that handler's closeAsync returned
> !C49A7310-F932-451B-A203-6D17F3140C0D.png!
> !e18e00dd6664485c2ff55284fe969474.png!
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)