zentol commented on a change in pull request #9038: 
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task 
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r304867348
 
 

 ##########
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 ##########
 @@ -101,101 +166,276 @@ public void teardown() throws Exception {
        public void testProgram() throws Exception {
                ExecutionEnvironment env = createExecutionEnvironment();
 
-               StaticFailureCounter.reset();
-               StaticMapFailureTracker.reset();
-
-               FailureStrategy failureStrategy = new 
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+               FailureStrategy failureStrategy = createFailureStrategy();
 
                DataSet<Long> input = env.generateSequence(0, 
EMITTED_RECORD_NUMBER - 1);
-               for (int i = 0; i < MAP_NUMBER; i++) {
+               for (int trackingIndex = 0; trackingIndex < MAP_NUMBER; 
trackingIndex++) {
                        input = input
                                .mapPartition(new 
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
-                               .name("Test partition mapper " + i);
+                               .name(TASK_NAME_PREFIX + trackingIndex);
                }
-               assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
 
+               assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
                StaticMapFailureTracker.verify();
        }
 
-       private ExecutionEnvironment createExecutionEnvironment() {
+       private static FailureStrategy createFailureStrategy() {
+               int failWithExceptionAfterNumberOfProcessedRecords = 
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+               int failTaskExecutorAfterNumberOfProcessedRecords = 
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+               // it has to fail only once during one mapper run so that 
different failure strategies do not mess up each other stats
+               FailureStrategy failureStrategy = new OneTimeFailureStrategy(
+                       new JoinedFailureStrategy(
 
 Review comment:
   could be further simplified by determining the strategy beforehand for each 
mapper separately at random and only passing that strategy. We would then have 
a fully determined test scenario before executing the test; which is 
functionally identical to the current state but (hopefully) easier to reproduce.
   You then also wouldn't need a List<Map> in the failure tracker; it could 
just be a list, since they could re-use the trackingIndex. The equals/hashcode 
and UUID in the failure strategies could then be removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to