[
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17245224#comment-17245224
]
Arvid Heise commented on FLINK-17921:
-------------------------------------
[~mingleizhang], your proposal is definitively a good way of thinking: blocking
the task thread for a longer time is not a good idea. However, I didn't see how
we can easily fix it. If you don't use an explicit timeout the default timeout
of 10s is applied as you have seen. We could increase or decrease the timeout
but it's rather arbitrary.
In a better world, we would fire the request asynchronously and check the
result later on. However, in the current version, this service is only used for
syncing watermarks: we send a watermark and receive the global watermark
immediately. So it's a sync call and we cannot make it work asynchronously -
it's in the design.
That leaves me to believe that the design of the feature is broken. What's more
the intent is to sync watermarks across the sources, which will be solved in a
more scalable way in the 1.13 release. Then, we can hopefully remove this code
with it's synchronous design.
However, the ticket actually showed two different issues: the JobMaster side of
updateGlobalAggregate is not handling user code errors at all. Especially null
values of aggregateFunction should throw an exception. And the more important
issue: JobMaster never completes futures on caller side if completed with null
on the job manager side. Both of these issues will be addressed in a follow-up
ticket by [~trohrmann].
> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected
> akka.timeout
> -----------------------------------------------------------------------------------
>
> Key: FLINK-17921
> URL: https://issues.apache.org/jira/browse/FLINK-17921
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination, Runtime / Task
> Affects Versions: 1.8.1, 1.10.1
> Reporter: zhangminglei
> Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}}
> would cause akka.timeout. But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}}
> return {{null}} and is used in
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause the
> following exception, this is not expected to happen from there. If we
> increase the {{akka.ask.timeout}} to another value, exception is still in
> there.
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that
> {{CompletableFuture.waitingGet}} is the key point, it imply that the
> {{Completabilefuture}} will give the current thread to waiting, which will
> lead to the timeout of the akka communication of Flink. Therefore, even if
> the timeout is 1 hour, the problem cannot be solved.
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007b76617a8> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use
> {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout
> information can truly reflect the current status of the program,
> {{akka.time.out}} error is too wide, which is not conducive to user
> troubleshooting.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)