zentol commented on a change in pull request #9038:
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r304868093
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
##########
@@ -101,101 +166,276 @@ public void teardown() throws Exception {
public void testProgram() throws Exception {
ExecutionEnvironment env = createExecutionEnvironment();
- StaticFailureCounter.reset();
- StaticMapFailureTracker.reset();
-
- FailureStrategy failureStrategy = new
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+ FailureStrategy failureStrategy = createFailureStrategy();
DataSet<Long> input = env.generateSequence(0,
EMITTED_RECORD_NUMBER - 1);
- for (int i = 0; i < MAP_NUMBER; i++) {
+ for (int trackingIndex = 0; trackingIndex < MAP_NUMBER;
trackingIndex++) {
input = input
.mapPartition(new
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
- .name("Test partition mapper " + i);
+ .name(TASK_NAME_PREFIX + trackingIndex);
}
- assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
+ assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
StaticMapFailureTracker.verify();
}
- private ExecutionEnvironment createExecutionEnvironment() {
+ private static FailureStrategy createFailureStrategy() {
+ int failWithExceptionAfterNumberOfProcessedRecords =
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+ int failTaskExecutorAfterNumberOfProcessedRecords =
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+ // it has to fail only once during one mapper run so that
different failure strategies do not mess up each other stats
+ FailureStrategy failureStrategy = new OneTimeFailureStrategy(
+ new JoinedFailureStrategy(
+ new GloballyTrackingFailureStrategy(
Review comment:
if you accept the above suggestion, then I think we could merge this
strategy into the `OneTimeFailureStrategy`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services