zentol commented on a change in pull request #9038:
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r304867348
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
##########
@@ -101,101 +166,276 @@ public void teardown() throws Exception {
public void testProgram() throws Exception {
ExecutionEnvironment env = createExecutionEnvironment();
- StaticFailureCounter.reset();
- StaticMapFailureTracker.reset();
-
- FailureStrategy failureStrategy = new
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+ FailureStrategy failureStrategy = createFailureStrategy();
DataSet<Long> input = env.generateSequence(0,
EMITTED_RECORD_NUMBER - 1);
- for (int i = 0; i < MAP_NUMBER; i++) {
+ for (int trackingIndex = 0; trackingIndex < MAP_NUMBER;
trackingIndex++) {
input = input
.mapPartition(new
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
- .name("Test partition mapper " + i);
+ .name(TASK_NAME_PREFIX + trackingIndex);
}
- assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
+ assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
StaticMapFailureTracker.verify();
}
- private ExecutionEnvironment createExecutionEnvironment() {
+ private static FailureStrategy createFailureStrategy() {
+ int failWithExceptionAfterNumberOfProcessedRecords =
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+ int failTaskExecutorAfterNumberOfProcessedRecords =
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+ // it has to fail only once during one mapper run so that
different failure strategies do not mess up each other stats
+ FailureStrategy failureStrategy = new OneTimeFailureStrategy(
+ new JoinedFailureStrategy(
Review comment:
could be further simplified by determining the strategy beforehand for each
mapper separately at random and only passing that strategy. We would then have
a fully determined test scenario before executing the test; which is
functionally identical to the current state but (hopefully) easier to reproduce.
You then also wouldn't need a List<Map> in the failure tracker; it could
just be a list, since they could re-use the trackingIndex. The equals/hashcode
and UUID in the failure strategies could then be removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services