tillrohrmann commented on a change in pull request #17606:
URL: https://github.com/apache/flink/pull/17606#discussion_r740889079



##########
File path: 
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
##########
@@ -240,17 +241,20 @@ private Object invokeRpc(Method method, Object[] args) 
throws Exception {
             final CompletableFuture<?> resultFuture = ask(rpcInvocation, 
futureTimeout);
 
             final CompletableFuture<Object> completableFuture = new 
CompletableFuture<>();
-            resultFuture.whenComplete(
-                    (resultValue, failure) -> {
-                        if (failure != null) {
-                            completableFuture.completeExceptionally(
-                                    resolveTimeoutException(
-                                            failure, callStackCapture, 
address, rpcInvocation));
-                        } else {
-                            completableFuture.complete(
-                                    deserializeValueIfNeeded(resultValue, 
method));
-                        }
-                    });
+            FutureUtils.forward(

Review comment:
       I think you are right. We don't need the `forward` here.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       I don't fully understand why this change is now required. Can we explain 
why this is the case?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       For this test, we have the heartbeat interval set to 200ms and the 
restart delay to 1.5s. Hence there should be multiple heartbeats being sent 
during the restart delay. Moreover, we mark TMs as unreachable after a single 
lost message. Hence, I can only think of a processing gap on the test machine 
to explain this situation.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       For this test, we have the heartbeat interval set to 200ms and the 
restart delay to 1.5s. Hence there should be multiple heartbeats being sent 
during the restart delay. Moreover, we mark TMs as unreachable after a single 
lost message. Hence, I can only think of a processing gap on the test machine 
to explain this situation. But the logs say differently :-(

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       But maybe the error margin is still too small and we should either 
increase the number of restart attempts or the restart delay.
   
   For reference I tried to harden the test case via #17107.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration 
configuration, final File coord
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 
1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));

Review comment:
       Hmm, with removing the forwarding we do indeed change the exception type 
for consumers of the returned future. If we don't want to accept this risk, 
then I think it is probably safer to revert this change.
   
   In general, consumers of futures should handle the `CompletionException` 
case though (e.g. via stripping or using `ExceptionUtils.containsCause`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to