[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhangminglei updated FLINK-17921: --------------------------------- Description: As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. But that's not the message what we want. If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return {{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause following exception, this is not expected to happen from there. If we increase the {{akka.ask.timeout}} to another value, the exception is still in there. {code:java} java.io.IOException: Error updating global aggregate. at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) ... 8 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) {code} The following stacktrace would describe the root cause. We can see that {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved. {code:java} java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout information can truly reflect the current status of the program, {{akka.time.out}} error is too wide, which is not conducive to user troubleshooting. was: As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return {{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause following exception, this is not expected to happen from there. If we increase the {{akka.ask.timeout}} to another value, the exception is still in there. {code:java} java.io.IOException: Error updating global aggregate. at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) ... 8 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) {code} The following stacktrace would describe the root cause. We can see that {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved. {code:java} java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout information can truly reflect the current status of the program, {{akka.time.out}} error is too wide, which is not conducive to user troubleshooting. > RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout > ------------------------------------------------------------------------ > > Key: FLINK-17921 > URL: https://issues.apache.org/jira/browse/FLINK-17921 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task > Reporter: zhangminglei > Priority: Major > > As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} > would cause akka.timeout. But that's not the message what we want. > If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} > return {{null}} and used it in > {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause > following exception, this is not expected to happen from there. If we > increase the {{akka.ask.timeout}} to another value, the exception is still in > there. > {code:java} > java.io.IOException: Error updating global aggregate. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) > ... 8 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > {code} > The following stacktrace would describe the root cause. We can see that > {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} > will give the current thread to waiting, which will lead to the timeout of > the akka communication of Flink. Therefore, even if the timeout is 1 hour, > the problem cannot be solved. > {code:java} > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007b76617a8> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use > {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout > information can truly reflect the current status of the program, > {{akka.time.out}} error is too wide, which is not conducive to user > troubleshooting. -- This message was sent by Atlassian Jira (v8.3.4#803005)