zentol commented on a change in pull request #9038:
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r302028207
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
##########
@@ -101,84 +132,116 @@ public void teardown() throws Exception {
public void testProgram() throws Exception {
ExecutionEnvironment env = createExecutionEnvironment();
- StaticFailureCounter.reset();
- StaticMapFailureTracker.reset();
-
- FailureStrategy failureStrategy = new
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+ FailureStrategy failureStrategy = createFailureStrategy();
DataSet<Long> input = env.generateSequence(0,
EMITTED_RECORD_NUMBER - 1);
- for (int i = 0; i < MAP_NUMBER; i++) {
+ for (int trackingIndex = 0; trackingIndex < MAP_NUMBER;
trackingIndex++) {
input = input
.mapPartition(new
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
- .name("Test partition mapper " + i);
+ .name(TASK_NAME_PREFIX + trackingIndex);
}
assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
StaticMapFailureTracker.verify();
}
- private ExecutionEnvironment createExecutionEnvironment() {
+ private static FailureStrategy createFailureStrategy() {
+ CoinToss coin = new CoinToss(1, EMITTED_RECORD_NUMBER);
+ return new JoinedFailureStrategy(
+ new RandomExceptionFailureStrategy(coin),
+ new RandomTaskExecutorFailureStrategy(coin));
+ }
+
+ private static ExecutionEnvironment createExecutionEnvironment() {
@SuppressWarnings("StaticVariableUsedBeforeInitialization")
ExecutionEnvironment env = new TestEnvironment(miniCluster, 1,
true);
-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_FAILURE_NUMBER,
Time.milliseconds(10)));
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_JOB_RESTART_ATTEMPTS,
Time.milliseconds(10)));
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
// forces all partitions to be blocking
return env;
}
- private enum StaticMapFailureTracker {
- ;
+ @SuppressWarnings({"StaticVariableUsedBeforeInitialization",
"OverlyBroadThrowsClause"})
+ private static void restartTaskManager() throws Exception {
+ int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
+ miniCluster.terminateTaskExecutor(tmi).get();
Review comment:
This could lead to a deadlock once the TE is waiting for the task
termination future. The task thread tries to kill the TE and blocks, while the
TE waits for the task to shutdown and blocks as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services