azagrebin commented on a change in pull request #9038: 
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task 
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r302503104
 
 

 ##########
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 ##########
 @@ -101,84 +132,116 @@ public void teardown() throws Exception {
        public void testProgram() throws Exception {
                ExecutionEnvironment env = createExecutionEnvironment();
 
-               StaticFailureCounter.reset();
-               StaticMapFailureTracker.reset();
-
-               FailureStrategy failureStrategy = new 
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+               FailureStrategy failureStrategy = createFailureStrategy();
 
                DataSet<Long> input = env.generateSequence(0, 
EMITTED_RECORD_NUMBER - 1);
-               for (int i = 0; i < MAP_NUMBER; i++) {
+               for (int trackingIndex = 0; trackingIndex < MAP_NUMBER; 
trackingIndex++) {
                        input = input
                                .mapPartition(new 
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
-                               .name("Test partition mapper " + i);
+                               .name(TASK_NAME_PREFIX + trackingIndex);
                }
                assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
 
                StaticMapFailureTracker.verify();
        }
 
-       private ExecutionEnvironment createExecutionEnvironment() {
+       private static FailureStrategy createFailureStrategy() {
+               CoinToss coin = new CoinToss(1, EMITTED_RECORD_NUMBER);
+               return new JoinedFailureStrategy(
+                       new RandomExceptionFailureStrategy(coin),
+                       new RandomTaskExecutorFailureStrategy(coin));
+       }
+
+       private static ExecutionEnvironment createExecutionEnvironment() {
                @SuppressWarnings("StaticVariableUsedBeforeInitialization")
                ExecutionEnvironment env = new TestEnvironment(miniCluster, 1, 
true);
-               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_FAILURE_NUMBER, 
Time.milliseconds(10)));
+               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_JOB_RESTART_ATTEMPTS,
 Time.milliseconds(10)));
                env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED); 
// forces all partitions to be blocking
                return env;
        }
 
-       private enum StaticMapFailureTracker {
-               ;
+       @SuppressWarnings({"StaticVariableUsedBeforeInitialization", 
"OverlyBroadThrowsClause"})
+       private static void restartTaskManager() throws Exception {
+               int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
+               miniCluster.terminateTaskExecutor(tmi).get();
+               miniCluster.startTaskExecutor();
+       }
 
-               private static final List<AtomicInteger> mapRestarts = new 
ArrayList<>(10);
-               private static final List<AtomicInteger> expectedMapRestarts = 
new ArrayList<>(10);
+       @FunctionalInterface
+       private interface FailureStrategy extends Serializable {
+               boolean failOrNot(int trackingIndex);
+       }
 
-               private static void reset() {
-                       mapRestarts.clear();
-                       expectedMapRestarts.clear();
-               }
+       private static class JoinedFailureStrategy implements FailureStrategy {
+               private static final long serialVersionUID = 1L;
 
-               private static int addNewMap() {
-                       mapRestarts.add(new AtomicInteger(0));
-                       expectedMapRestarts.add(new AtomicInteger(1));
-                       return mapRestarts.size() - 1;
-               }
+               private final FailureStrategy[] failureStrategies;
 
-               private static void mapRestart(int index) {
-                       mapRestarts.get(index).incrementAndGet();
+               private JoinedFailureStrategy(FailureStrategy ... 
failureStrategies) {
+                       this.failureStrategies = failureStrategies;
                }
 
-               private static void mapFailure(int index) {
-                       expectedMapRestarts.get(index).incrementAndGet();
+               @Override
+               public boolean failOrNot(int trackingIndex) {
+                       return Arrays
+                               .stream(failureStrategies)
+                               .anyMatch(failureStrategy -> 
failureStrategy.failOrNot(trackingIndex));
                }
+       }
 
-               private static void verify() {
-                       assertThat(collect(mapRestarts), 
is(collect(expectedMapRestarts)));
+       private static class RandomExceptionFailureStrategy extends 
AbstractRandomFailureStrategy {
+               private static final long serialVersionUID = 1L;
+
+               private RandomExceptionFailureStrategy(CoinToss coin) {
+                       super(coin);
                }
 
-               private static int[] collect(Collection<AtomicInteger> list) {
-                       return 
list.stream().mapToInt(AtomicInteger::get).toArray();
+               @Override
+               void fail(int trackingIndex) {
+                       StaticMapFailureTracker.mapFailure(trackingIndex);
+                       throw new FlinkRuntimeException("BAGA-BOOM!!! The user 
function generated test failure.");
                }
        }
 
-       @FunctionalInterface
-       private interface FailureStrategy extends Serializable {
-               void failOrNot();
+       private static class RandomTaskExecutorFailureStrategy extends 
AbstractRandomFailureStrategy {
+               private static final long serialVersionUID = 1L;
+
+               private RandomTaskExecutorFailureStrategy(CoinToss coin) {
+                       super(coin);
+               }
+
+               @Override
+               void fail(int trackingIndex) {
+                       
StaticMapFailureTracker.mapFailureWithBacktracking(trackingIndex);
+                       try {
+                               restartTaskManager();
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       } catch (Exception e) {
+                               ExceptionUtils.rethrow(e);
 
 Review comment:
   True, I will add tracking and verification of unexpected failures in 
`StaticMapFailureTracker`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to