azagrebin commented on a change in pull request #9038: 
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task 
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r302502844
 
 

 ##########
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 ##########
 @@ -101,84 +132,116 @@ public void teardown() throws Exception {
        public void testProgram() throws Exception {
                ExecutionEnvironment env = createExecutionEnvironment();
 
-               StaticFailureCounter.reset();
-               StaticMapFailureTracker.reset();
-
-               FailureStrategy failureStrategy = new 
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+               FailureStrategy failureStrategy = createFailureStrategy();
 
                DataSet<Long> input = env.generateSequence(0, 
EMITTED_RECORD_NUMBER - 1);
-               for (int i = 0; i < MAP_NUMBER; i++) {
+               for (int trackingIndex = 0; trackingIndex < MAP_NUMBER; 
trackingIndex++) {
                        input = input
                                .mapPartition(new 
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
-                               .name("Test partition mapper " + i);
+                               .name(TASK_NAME_PREFIX + trackingIndex);
                }
                assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
 
                StaticMapFailureTracker.verify();
        }
 
-       private ExecutionEnvironment createExecutionEnvironment() {
+       private static FailureStrategy createFailureStrategy() {
+               CoinToss coin = new CoinToss(1, EMITTED_RECORD_NUMBER);
+               return new JoinedFailureStrategy(
+                       new RandomExceptionFailureStrategy(coin),
+                       new RandomTaskExecutorFailureStrategy(coin));
+       }
+
+       private static ExecutionEnvironment createExecutionEnvironment() {
                @SuppressWarnings("StaticVariableUsedBeforeInitialization")
                ExecutionEnvironment env = new TestEnvironment(miniCluster, 1, 
true);
-               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_FAILURE_NUMBER, 
Time.milliseconds(10)));
+               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_JOB_RESTART_ATTEMPTS,
 Time.milliseconds(10)));
                env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED); 
// forces all partitions to be blocking
                return env;
        }
 
-       private enum StaticMapFailureTracker {
-               ;
+       @SuppressWarnings({"StaticVariableUsedBeforeInitialization", 
"OverlyBroadThrowsClause"})
+       private static void restartTaskManager() throws Exception {
+               int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
+               miniCluster.terminateTaskExecutor(tmi).get();
 
 Review comment:
   The task should get interrupted at some point in worst case and the 
`InterruptedException` is ignored in `RandomTaskExecutorFailureStrategy.fail` 
but JM usually get informed about TM shutdown earlier and fails the task and 
the job.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to