zentol commented on a change in pull request #17606:
URL: https://github.com/apache/flink/pull/17606#discussion_r740932666
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
The issue depends on when the first heartbeat is sent after the TM went
down.
If it happens before the restart, then we properly remove the TM and don't
use it later on.
If it does not happen before the restart, then the job fails later on a
second time while attempting to deploy task to the same TM.
I think this change just ever so slightly adjusts the timings to make it
more common; it can already happen in the current master.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
The issue depends on when the first heartbeat is sent after the TM went
down (and would then be considered unreachable).
If it happens before the restart, then we properly remove the TM and don't
use it later on.
If it does not happen before the restart, then the job fails later on a
second time while attempting to deploy task to the same TM.
I think this change just ever so slightly adjusts the timings to make it
more common; it can already happen in the current master.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
- JM sends heartbeat to TM1 (could be transmitted, but may not get a
response)
- TM1 is killed
- job restarts (not delayed by missing TM1 because cancelTask RPCs fail
immediately)
- JM sends heartbeat to TM1 (could not be transmitted)
- job restarts second time
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
- JM sends heartbeat to TM1 (transmitted, but may not get a response)
- TM1 is killed
- job restarts (not delayed by missing TM1 because cancelTask RPCs fail
immediately)
- JM sends heartbeat to TM1 (not transmitted)
- job restarts second time
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
- JM sends heartbeat request to TM1 (transmitted, but may not get a
response)
- TM1 is killed
- job restarts (not delayed by missing TM1 because cancelTask RPCs fail
immediately)
- JM sends heartbeat request to TM1 (not transmitted)
- job restarts second time
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
I can also reproduce it locally where there are no processing gaps.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
The restart delay is a fair point, I'll check the logs again.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
It looks like the failed heartbeats are being ignored while we are
waiting for the restart delay:
```
// 64589-0ef458 is the killed TM
// this is the last heartbeat
17532 o.a.f.r.jobmaster.JobMaster [] - Received heartbeat from
127.0.0.1:64589-0ef458.
...
<start job restart>
<cancel slot requests>
<cleanup partitions>
<various failed cancelTask RPCs>
17740 <reduce resource requirements to 0>
...
17440... JM idling, sending heartbeat requests
19212 o.a.f.r.jobmaster.JobMaster [] - Archive local failure causing attempt
05bcf9159a5a301d2f7b6566111235da to fail
...
19213 o.a.f.r.executiongraph.ExecutionGraph [] - Job Flink Java Job at Mon
Nov 01 15:42:30 CET 2021 (4daf5dcbf65f7cd384ac228ad72ab5c6) switched from state
RESTARTING to RUNNING.
19777 o.a.f.r.jobmaster.JobMaster [] - TaskManager with id
127.0.0.1:64589-0ef458 is no longer reachable.
19777 o.a.f.r.jobmaster.JobMaster [] - Disconnect TaskExecutor
127.0.0.1:64589-0ef458 because: TaskManager with id 127.0.0.1:64589-0ef458 is
no longer reachable.
```
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
I'm wondering if the exception the HeartbeatMonitorImpl sees could be
wrapped in a CompletionException...
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
💢
```
21440 o.a.f.r.j.JobMaster [] - #handleHeartbeatRpcFailure exception
java.util.concurrent.CompletionException:
org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException
```
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
##########
@@ -67,7 +67,7 @@ public void testTaskManagerFailure(Configuration
configuration, final File coord
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("localhost",
1337, configuration);
env.setParallelism(PARALLELISM);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1500L));
Review comment:
I suppose stripping the `CompletionException` in the
`HeartbeatManagerImpl` should be done in any case because it is so easy to
introduce bugs like this.
I'm curious though whether we should revert the `AkkaInvocationHandler` to
again to the manual forwarding of the result...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]