zentol commented on a change in pull request #9038:
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r303859894
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
##########
@@ -101,84 +150,207 @@ public void teardown() throws Exception {
public void testProgram() throws Exception {
ExecutionEnvironment env = createExecutionEnvironment();
- StaticFailureCounter.reset();
- StaticMapFailureTracker.reset();
-
- FailureStrategy failureStrategy = new
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+ FailureStrategy failureStrategy = createFailureStrategy();
DataSet<Long> input = env.generateSequence(0,
EMITTED_RECORD_NUMBER - 1);
- for (int i = 0; i < MAP_NUMBER; i++) {
+ for (int trackingIndex = 0; trackingIndex < MAP_NUMBER;
trackingIndex++) {
input = input
.mapPartition(new
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
- .name("Test partition mapper " + i);
+ .name(TASK_NAME_PREFIX + trackingIndex);
}
assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
StaticMapFailureTracker.verify();
}
- private ExecutionEnvironment createExecutionEnvironment() {
+ private static FailureStrategy createFailureStrategy() {
+ CoinToss coin = new CoinToss(1, EMITTED_RECORD_NUMBER);
+ return new FixedFailureStrategy(
+ 1,
+ new LimitedGlobalFailureStrategy(
+ new JoinedFailureStrategy(
+ new
RandomExceptionFailureStrategy(coin),
+ new
RandomTaskExecutorFailureStrategy(coin))));
+ }
+
+ private static ExecutionEnvironment createExecutionEnvironment() {
@SuppressWarnings("StaticVariableUsedBeforeInitialization")
ExecutionEnvironment env = new TestEnvironment(miniCluster, 1,
true);
-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_FAILURE_NUMBER,
Time.milliseconds(10)));
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_JOB_RESTART_ATTEMPTS,
Time.milliseconds(10)));
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
// forces all partitions to be blocking
return env;
}
- private enum StaticMapFailureTracker {
+ @SuppressWarnings({"StaticVariableUsedBeforeInitialization",
"OverlyBroadThrowsClause"})
+ private static void restartTaskManager() throws Exception {
+ int tmi = lastTaskManagerIndexInMiniCluster.getAndIncrement();
+ try {
+ miniCluster.terminateTaskExecutor(tmi).get();
+ } finally {
+ miniCluster.startTaskExecutor();
+ }
+ }
+
+ @FunctionalInterface
+ private interface FailureStrategy extends Serializable {
+ /**
+ * Decides whether to fail and fails the task implicitly or by
throwing an exception.
+ *
+ * @param trackingIndex index of the mapper task in the sequence
+ * @return {@code true} if task is failed implicitly or {@code
false} if task is not failed
+ * @throws Exception To fail the task explicitly
+ */
+ boolean failOrNot(int trackingIndex) throws Exception;
+ }
+
+ private static class FixedFailureStrategy implements FailureStrategy {
+ private static final long serialVersionUID = 1L;
+
+ private final int maxFailureNumber;
+ private final FailureStrategy wrappedFailureStrategy;
+ private transient int failureNumber;
+
+ private FixedFailureStrategy(int maxFailureNumber,
FailureStrategy wrappedFailureStrategy) {
+ this.maxFailureNumber = maxFailureNumber;
+ this.wrappedFailureStrategy = wrappedFailureStrategy;
+ }
+
+ @Override
+ public boolean failOrNot(int trackingIndex) throws Exception {
+ if (failureNumber < maxFailureNumber) {
+ try {
+ boolean failed =
wrappedFailureStrategy.failOrNot(trackingIndex);
+ if (failed) {
+ failureNumber++;
+ }
+ return failed;
+ } catch (Exception e) {
+ failureNumber++;
Review comment:
what's the benefit of incrementing the count when an exception is thrown?
This RS will never be queried again since on the next attempt a fresh instance
is used.
My feeling is you could simplify this whole limiting logic by enforcing
exactly 1 failure for each mapper across the entire test, which you could
trivially track via the static failure counter.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services