tillrohrmann commented on a change in pull request #15882:
URL: https://github.com/apache/flink/pull/15882#discussion_r629915588



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void 
testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex
         assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
     }
 
+    @Test
+    public void testCancellationOfJobInRestartLoop() throws Exception {

Review comment:
       Let's give a bit more expressive name. Something like 
`testJobCancellationWhileRestartingSucceeds` or so.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void 
testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex
         assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
     }
 
+    @Test
+    public void testCancellationOfJobInRestartLoop() throws Exception {
+        final long timeInRestartingState = 10000L;
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // configure a high delay between attempts: We'll stay in RESTARTING 
for 10 seconds.
+        env.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
timeInRestartingState));
+        env.addSource(new NotifyOnRunningAndFailingSource()).addSink(new 
DiscardingSink<>());
+
+        JobClient client = env.executeAsync();
+
+        NotifyOnRunningAndFailingSource.runningLatch.await();

Review comment:
       Why do we need this latch here?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void 
testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex
         assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
     }
 
+    @Test
+    public void testCancellationOfJobInRestartLoop() throws Exception {
+        final long timeInRestartingState = 10000L;
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // configure a high delay between attempts: We'll stay in RESTARTING 
for 10 seconds.
+        env.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
timeInRestartingState));
+        env.addSource(new NotifyOnRunningAndFailingSource()).addSink(new 
DiscardingSink<>());
+
+        JobClient client = env.executeAsync();
+
+        NotifyOnRunningAndFailingSource.runningLatch.await();
+
+        // wait until we are in RESTARTING state
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RESTARTING,
+                Deadline.fromNow(Duration.of(timeInRestartingState, 
ChronoUnit.MILLIS)),
+                5);
+
+        // now cancel while in RESTARTING state
+        client.cancel().get();
+    }

Review comment:
       Nit: Does this really have to live in the `flink-tests` module? It feels 
to me that this should be testable as an integration test in the 
`flink-runtime` module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to