tillrohrmann commented on a change in pull request #17606:
URL: https://github.com/apache/flink/pull/17606#discussion_r740889079
##########
File path:
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
##########
@@ -240,17 +241,20 @@ private Object invokeRpc(Method method, Object[] args)
throws Exception {
final CompletableFuture<?> resultFuture = ask(rpcInvocation,
futureTimeout);
final CompletableFuture<Object> completableFuture = new
CompletableFuture<>();
- resultFuture.whenComplete(
- (resultValue, failure) -> {
- if (failure != null) {
- completableFuture.completeExceptionally(
- resolveTimeoutException(
- failure, callStackCapture,
address, rpcInvocation));
- } else {
- completableFuture.complete(
- deserializeValueIfNeeded(resultValue,
method));
- }
- });
+ FutureUtils.forward(
Review comment:
I think you are right. We don't need the `forward` here.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
I don't fully understand why this change is now required. Can we explain
why this is the case?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
For this test, we have the heartbeat interval set to 200ms and the
restart delay to 1.5s. Hence there should be multiple heartbeats being sent
during the restart delay. Moreover, we mark TMs as unreachable after a single
lost message. Hence, I can only think of a processing gap on the test machine
to explain this situation.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
For this test, we have the heartbeat interval set to 200ms and the
restart delay to 1.5s. Hence there should be multiple heartbeats being sent
during the restart delay. Moreover, we mark TMs as unreachable after a single
lost message. Hence, I can only think of a processing gap on the test machine
to explain this situation. But the logs say differently :-(
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
But maybe the error margin is still too small and we should either
increase the number of restart attempts or the restart delay.
For reference I tried to harden the test case via #17107.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
Hmm, with removing the forwarding we do indeed change the exception type
for consumers of the returned future. If we don't want to accept this risk,
then I think it is probably safer to revert this change.
In general, consumers of futures should handle the `CompletionException`
case though (e.g. via stripping or using `ExceptionUtils.containsCause`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]