[
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arvid Heise resolved FLINK-17921.
---------------------------------
Resolution: Won't Fix
> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected
> akka.timeout
> -----------------------------------------------------------------------------------
>
> Key: FLINK-17921
> URL: https://issues.apache.org/jira/browse/FLINK-17921
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination, Runtime / Task
> Affects Versions: 1.8.1, 1.10.1
> Reporter: zhangminglei
> Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}}
> would cause akka.timeout. But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}}
> return {{null}} and is used in
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause the
> following exception, this is not expected to happen from there. If we
> increase the {{akka.ask.timeout}} to another value, exception is still in
> there.
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that
> {{CompletableFuture.waitingGet}} is the key point, it imply that the
> {{Completabilefuture}} will give the current thread to waiting, which will
> lead to the timeout of the akka communication of Flink. Therefore, even if
> the timeout is 1 hour, the problem cannot be solved.
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007b76617a8> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use
> {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout
> information can truly reflect the current status of the program,
> {{akka.time.out}} error is too wide, which is not conducive to user
> troubleshooting.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)