zentol commented on a change in pull request #9038: 
[FLINK-13169][tests][coordination] IT test for fine-grained recovery (task 
executor failures)
URL: https://github.com/apache/flink/pull/9038#discussion_r304868093
 
 

 ##########
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 ##########
 @@ -101,101 +166,276 @@ public void teardown() throws Exception {
        public void testProgram() throws Exception {
                ExecutionEnvironment env = createExecutionEnvironment();
 
-               StaticFailureCounter.reset();
-               StaticMapFailureTracker.reset();
-
-               FailureStrategy failureStrategy = new 
RandomExceptionFailureStrategy(1, EMITTED_RECORD_NUMBER);
+               FailureStrategy failureStrategy = createFailureStrategy();
 
                DataSet<Long> input = env.generateSequence(0, 
EMITTED_RECORD_NUMBER - 1);
-               for (int i = 0; i < MAP_NUMBER; i++) {
+               for (int trackingIndex = 0; trackingIndex < MAP_NUMBER; 
trackingIndex++) {
                        input = input
                                .mapPartition(new 
TestPartitionMapper(StaticMapFailureTracker.addNewMap(), failureStrategy))
-                               .name("Test partition mapper " + i);
+                               .name(TASK_NAME_PREFIX + trackingIndex);
                }
-               assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
 
+               assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
                StaticMapFailureTracker.verify();
        }
 
-       private ExecutionEnvironment createExecutionEnvironment() {
+       private static FailureStrategy createFailureStrategy() {
+               int failWithExceptionAfterNumberOfProcessedRecords = 
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+               int failTaskExecutorAfterNumberOfProcessedRecords = 
rnd.nextInt(EMITTED_RECORD_NUMBER) + 1;
+               // it has to fail only once during one mapper run so that 
different failure strategies do not mess up each other stats
+               FailureStrategy failureStrategy = new OneTimeFailureStrategy(
+                       new JoinedFailureStrategy(
+                               new GloballyTrackingFailureStrategy(
 
 Review comment:
   if you accept the above suggestion, then I think we could merge this 
strategy into the `OneTimeFailureStrategy`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to