zentol commented on a change in pull request #9038:
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r302027879
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
##########
@@ -101,84 +132,116 @@ public void teardown() throws Exception {
public void testProgram() throws Exception {
ExecutionEnvironment env = createExecutionEnvironment();
- StaticFailureCounter.reset();
- StaticMapFailureTracker.reset();
-
- FailureStrategy failureStrategy = new
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+ FailureStrategy failureStrategy = createFailureStrategy();
DataSet<Long> input = env.generateSequence(0,
EMITTED_RECORD_NUMBER - 1);
- for (int i = 0; i < MAP_NUMBER; i++) {
+ for (int trackingIndex = 0; trackingIndex < MAP_NUMBER;
trackingIndex++) {
input = input
.mapPartition(new
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
- .name("Test partition mapper " + i);
+ .name(TASK_NAME_PREFIX + trackingIndex);
}
assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
StaticMapFailureTracker.verify();
}
- private ExecutionEnvironment createExecutionEnvironment() {
+ private static FailureStrategy createFailureStrategy() {
+ CoinToss coin = new CoinToss(1, EMITTED_RECORD_NUMBER);
+ return new JoinedFailureStrategy(
+ new RandomExceptionFailureStrategy(coin),
+ new RandomTaskExecutorFailureStrategy(coin));
+ }
+
+ private static ExecutionEnvironment createExecutionEnvironment() {
@SuppressWarnings("StaticVariableUsedBeforeInitialization")
ExecutionEnvironment env = new TestEnvironment(miniCluster, 1,
true);
-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_FAILURE_NUMBER,
Time.milliseconds(10)));
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_JOB_RESTART_ATTEMPTS,
Time.milliseconds(10)));
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
// forces all partitions to be blocking
return env;
}
- private enum StaticMapFailureTracker {
- ;
+ @SuppressWarnings({"StaticVariableUsedBeforeInitialization",
"OverlyBroadThrowsClause"})
+ private static void restartTaskManager() throws Exception {
+ int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
+ miniCluster.terminateTaskExecutor(tmi).get();
+ miniCluster.startTaskExecutor();
+ }
- private static final List<AtomicInteger> mapRestarts = new
ArrayList<>(10);
- private static final List<AtomicInteger> expectedMapRestarts =
new ArrayList<>(10);
+ @FunctionalInterface
+ private interface FailureStrategy extends Serializable {
+ boolean failOrNot(int trackingIndex);
+ }
- private static void reset() {
- mapRestarts.clear();
- expectedMapRestarts.clear();
- }
+ private static class JoinedFailureStrategy implements FailureStrategy {
+ private static final long serialVersionUID = 1L;
- private static int addNewMap() {
- mapRestarts.add(new AtomicInteger(0));
- expectedMapRestarts.add(new AtomicInteger(1));
- return mapRestarts.size() - 1;
- }
+ private final FailureStrategy[] failureStrategies;
- private static void mapRestart(int index) {
- mapRestarts.get(index).incrementAndGet();
+ private JoinedFailureStrategy(FailureStrategy ...
failureStrategies) {
+ this.failureStrategies = failureStrategies;
}
- private static void mapFailure(int index) {
- expectedMapRestarts.get(index).incrementAndGet();
+ @Override
+ public boolean failOrNot(int trackingIndex) {
+ return Arrays
+ .stream(failureStrategies)
+ .anyMatch(failureStrategy ->
failureStrategy.failOrNot(trackingIndex));
}
+ }
- private static void verify() {
- assertThat(collect(mapRestarts),
is(collect(expectedMapRestarts)));
+ private static class RandomExceptionFailureStrategy extends
AbstractRandomFailureStrategy {
+ private static final long serialVersionUID = 1L;
+
+ private RandomExceptionFailureStrategy(CoinToss coin) {
+ super(coin);
}
- private static int[] collect(Collection<AtomicInteger> list) {
- return
list.stream().mapToInt(AtomicInteger::get).toArray();
+ @Override
+ void fail(int trackingIndex) {
+ StaticMapFailureTracker.mapFailure(trackingIndex);
+ throw new FlinkRuntimeException("BAGA-BOOM!!! The user
function generated test failure.");
}
}
- @FunctionalInterface
- private interface FailureStrategy extends Serializable {
- void failOrNot();
+ private static class RandomTaskExecutorFailureStrategy extends
AbstractRandomFailureStrategy {
+ private static final long serialVersionUID = 1L;
+
+ private RandomTaskExecutorFailureStrategy(CoinToss coin) {
+ super(coin);
+ }
+
+ @Override
+ void fail(int trackingIndex) {
+
StaticMapFailureTracker.mapFailureWithBacktracking(trackingIndex);
+ try {
+ restartTaskManager();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ ExceptionUtils.rethrow(e);
+ }
+ }
}
- private static class RandomExceptionFailureStrategy implements
FailureStrategy {
+ private abstract static class AbstractRandomFailureStrategy implements
FailureStrategy {
private static final long serialVersionUID = 1L;
private final CoinToss coin;
- private RandomExceptionFailureStrategy(int probFraction, int
probBase) {
- this.coin = new CoinToss(probFraction, probBase);
+ private AbstractRandomFailureStrategy(CoinToss coin) {
+ this.coin = coin;
}
@Override
- public void failOrNot() {
- if (coin.toss() && StaticFailureCounter.failOrNot()) {
- throw new FlinkRuntimeException("BAGA-BOOM!!!
The user function generated test failure.");
+ public boolean failOrNot(int trackingIndex) {
Review comment:
What's the idea behind returning a boolean? The
`RandomExceptionFailureStrategy` throws an error right away (), and the other
kills the TM which should also result in a new instance being created (after
all, there is only 1 TM).
In which cases would the return value actually be used?
EDIT: OK, so this is used in `JoinedFailureStrategy#failOrNot` to prevent
multiple strategies from issuing failures. This is a fairly odd way of
accomplishing this imo; I would prefer if we would separate the determination
of whether we should fail from the actual failure.
Something like `boolean shouldFail()` and `void fail() throws Exception`,
which you could then use in the `JoinedFailureStrategy` using
`failureStrategies.anyMatch(FS::shouldFail).ifPresent(FS::fail)`.
You can still have a default `failOrNot` implementation that does both at
once for easier use in the functions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services