[ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17245224#comment-17245224
 ] 

Arvid Heise commented on FLINK-17921:
-------------------------------------

[~mingleizhang], your proposal is definitively a good way of thinking: blocking 
the task thread for a longer time is not a good idea. However, I didn't see how 
we can easily fix it. If you don't use an explicit timeout the default timeout 
of 10s is applied as you have seen. We could increase or decrease the timeout 
but it's rather arbitrary. 

In a better world, we would fire the request asynchronously and check the 
result later on. However, in the current version, this service is only used for 
syncing watermarks: we send a watermark and receive the global watermark 
immediately. So it's a sync call and we cannot make it work asynchronously - 
it's in the design.

That leaves me to believe that the design of the feature is broken. What's more 
the intent is to sync watermarks across the sources, which will be solved in a 
more scalable way in the 1.13 release. Then, we can hopefully remove this code 
with it's synchronous design.

However, the ticket actually showed two different issues: the JobMaster side of 
updateGlobalAggregate is not handling user code errors at all. Especially null 
values of aggregateFunction should throw an exception. And the more important 
issue: JobMaster never completes futures on caller side if completed with null 
on the job manager side. Both of these issues will be addressed in a follow-up 
ticket by [~trohrmann].

> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected 
> akka.timeout
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-17921
>                 URL: https://issues.apache.org/jira/browse/FLINK-17921
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination, Runtime / Task
>    Affects Versions: 1.8.1, 1.10.1
>            Reporter: zhangminglei
>            Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
> would cause akka.timeout.  But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} 
> return {{null}} and is used in 
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause the 
> following exception, this is not expected to happen from there. If we 
> increase the {{akka.ask.timeout}} to another value, exception is still in 
> there. 
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that 
> {{CompletableFuture.waitingGet}} is the key point, it imply that the 
> {{Completabilefuture}} will give the current thread to waiting, which will 
> lead to the timeout of the akka communication of Flink. Therefore, even if 
> the timeout is 1 hour, the problem cannot be solved. 
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007b76617a8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use 
> {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout 
> information can truly reflect the current status of the program, 
> {{akka.time.out}} error is too wide, which is not conducive to user 
> troubleshooting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to