zentol commented on a change in pull request #9038: 
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task 
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r302027879
 
 

 ##########
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 ##########
 @@ -101,84 +132,116 @@ public void teardown() throws Exception {
        public void testProgram() throws Exception {
                ExecutionEnvironment env = createExecutionEnvironment();
 
-               StaticFailureCounter.reset();
-               StaticMapFailureTracker.reset();
-
-               FailureStrategy failureStrategy = new 
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+               FailureStrategy failureStrategy = createFailureStrategy();
 
                DataSet<Long> input = env.generateSequence(0, 
EMITTED_RECORD_NUMBER - 1);
-               for (int i = 0; i < MAP_NUMBER; i++) {
+               for (int trackingIndex = 0; trackingIndex < MAP_NUMBER; 
trackingIndex++) {
                        input = input
                                .mapPartition(new 
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
-                               .name("Test partition mapper " + i);
+                               .name(TASK_NAME_PREFIX + trackingIndex);
                }
                assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
 
                StaticMapFailureTracker.verify();
        }
 
-       private ExecutionEnvironment createExecutionEnvironment() {
+       private static FailureStrategy createFailureStrategy() {
+               CoinToss coin = new CoinToss(1, EMITTED_RECORD_NUMBER);
+               return new JoinedFailureStrategy(
+                       new RandomExceptionFailureStrategy(coin),
+                       new RandomTaskExecutorFailureStrategy(coin));
+       }
+
+       private static ExecutionEnvironment createExecutionEnvironment() {
                @SuppressWarnings("StaticVariableUsedBeforeInitialization")
                ExecutionEnvironment env = new TestEnvironment(miniCluster, 1, 
true);
-               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_FAILURE_NUMBER, 
Time.milliseconds(10)));
+               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_JOB_RESTART_ATTEMPTS,
 Time.milliseconds(10)));
                env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED); 
// forces all partitions to be blocking
                return env;
        }
 
-       private enum StaticMapFailureTracker {
-               ;
+       @SuppressWarnings({"StaticVariableUsedBeforeInitialization", 
"OverlyBroadThrowsClause"})
+       private static void restartTaskManager() throws Exception {
+               int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
+               miniCluster.terminateTaskExecutor(tmi).get();
+               miniCluster.startTaskExecutor();
+       }
 
-               private static final List<AtomicInteger> mapRestarts = new 
ArrayList<>(10);
-               private static final List<AtomicInteger> expectedMapRestarts = 
new ArrayList<>(10);
+       @FunctionalInterface
+       private interface FailureStrategy extends Serializable {
+               boolean failOrNot(int trackingIndex);
+       }
 
-               private static void reset() {
-                       mapRestarts.clear();
-                       expectedMapRestarts.clear();
-               }
+       private static class JoinedFailureStrategy implements FailureStrategy {
+               private static final long serialVersionUID = 1L;
 
-               private static int addNewMap() {
-                       mapRestarts.add(new AtomicInteger(0));
-                       expectedMapRestarts.add(new AtomicInteger(1));
-                       return mapRestarts.size() - 1;
-               }
+               private final FailureStrategy[] failureStrategies;
 
-               private static void mapRestart(int index) {
-                       mapRestarts.get(index).incrementAndGet();
+               private JoinedFailureStrategy(FailureStrategy ... 
failureStrategies) {
+                       this.failureStrategies = failureStrategies;
                }
 
-               private static void mapFailure(int index) {
-                       expectedMapRestarts.get(index).incrementAndGet();
+               @Override
+               public boolean failOrNot(int trackingIndex) {
+                       return Arrays
+                               .stream(failureStrategies)
+                               .anyMatch(failureStrategy -> 
failureStrategy.failOrNot(trackingIndex));
                }
+       }
 
-               private static void verify() {
-                       assertThat(collect(mapRestarts), 
is(collect(expectedMapRestarts)));
+       private static class RandomExceptionFailureStrategy extends 
AbstractRandomFailureStrategy {
+               private static final long serialVersionUID = 1L;
+
+               private RandomExceptionFailureStrategy(CoinToss coin) {
+                       super(coin);
                }
 
-               private static int[] collect(Collection<AtomicInteger> list) {
-                       return 
list.stream().mapToInt(AtomicInteger::get).toArray();
+               @Override
+               void fail(int trackingIndex) {
+                       StaticMapFailureTracker.mapFailure(trackingIndex);
+                       throw new FlinkRuntimeException("BAGA-BOOM!!! The user 
function generated test failure.");
                }
        }
 
-       @FunctionalInterface
-       private interface FailureStrategy extends Serializable {
-               void failOrNot();
+       private static class RandomTaskExecutorFailureStrategy extends 
AbstractRandomFailureStrategy {
+               private static final long serialVersionUID = 1L;
+
+               private RandomTaskExecutorFailureStrategy(CoinToss coin) {
+                       super(coin);
+               }
+
+               @Override
+               void fail(int trackingIndex) {
+                       
StaticMapFailureTracker.mapFailureWithBacktracking(trackingIndex);
+                       try {
+                               restartTaskManager();
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       } catch (Exception e) {
+                               ExceptionUtils.rethrow(e);
+                       }
+               }
        }
 
-       private static class RandomExceptionFailureStrategy implements 
FailureStrategy {
+       private abstract static class AbstractRandomFailureStrategy implements 
FailureStrategy {
                private static final long serialVersionUID = 1L;
 
                private final CoinToss coin;
 
-               private RandomExceptionFailureStrategy(int probFraction, int 
probBase) {
-                       this.coin = new CoinToss(probFraction, probBase);
+               private AbstractRandomFailureStrategy(CoinToss coin) {
+                       this.coin = coin;
                }
 
                @Override
-               public void failOrNot() {
-                       if (coin.toss() && StaticFailureCounter.failOrNot()) {
-                               throw new FlinkRuntimeException("BAGA-BOOM!!! 
The user function generated test failure.");
+               public boolean failOrNot(int trackingIndex) {
 
 Review comment:
   What's the idea behind returning a boolean? The 
`RandomExceptionFailureStrategy` throws an error right away (), and the other 
kills the TM which should also result in a new instance being created (after 
all, there is only 1 TM).
   
   In which cases would the return value actually be used?
   
   EDIT: OK, so this is used in `JoinedFailureStrategy#failOrNot` to prevent 
multiple strategies from issuing failures. This is a fairly odd way of 
accomplishing this imo; I would prefer if we would separate the determination 
of whether we should fail from the actual failure.
   Something like `boolean shouldFail()` and `void fail() throws Exception`, 
which you could then use in the `JoinedFailureStrategy` using 
`failureStrategies.anyMatch(FS::shouldFail).ifPresent(FS::fail)`.
   You can still have a default `failOrNot` implementation that does both at 
once for easier use in the functions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to