tillrohrmann commented on a change in pull request #15882:
URL: https://github.com/apache/flink/pull/15882#discussion_r629915588
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void
testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex
assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
}
+ @Test
+ public void testCancellationOfJobInRestartLoop() throws Exception {
Review comment:
Let's give a bit more expressive name. Something like
`testJobCancellationWhileRestartingSucceeds` or so.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void
testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex
assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
}
+ @Test
+ public void testCancellationOfJobInRestartLoop() throws Exception {
+ final long timeInRestartingState = 10000L;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // configure a high delay between attempts: We'll stay in RESTARTING
for 10 seconds.
+ env.setRestartStrategy(
+ RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
timeInRestartingState));
+ env.addSource(new NotifyOnRunningAndFailingSource()).addSink(new
DiscardingSink<>());
+
+ JobClient client = env.executeAsync();
+
+ NotifyOnRunningAndFailingSource.runningLatch.await();
Review comment:
Why do we need this latch here?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -231,6 +232,43 @@ public void
testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex
assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
}
+ @Test
+ public void testCancellationOfJobInRestartLoop() throws Exception {
+ final long timeInRestartingState = 10000L;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // configure a high delay between attempts: We'll stay in RESTARTING
for 10 seconds.
+ env.setRestartStrategy(
+ RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
timeInRestartingState));
+ env.addSource(new NotifyOnRunningAndFailingSource()).addSink(new
DiscardingSink<>());
+
+ JobClient client = env.executeAsync();
+
+ NotifyOnRunningAndFailingSource.runningLatch.await();
+
+ // wait until we are in RESTARTING state
+ CommonTestUtils.waitUntilCondition(
+ () -> client.getJobStatus().get() == JobStatus.RESTARTING,
+ Deadline.fromNow(Duration.of(timeInRestartingState,
ChronoUnit.MILLIS)),
+ 5);
+
+ // now cancel while in RESTARTING state
+ client.cancel().get();
+ }
Review comment:
Nit: Does this really have to live in the `flink-tests` module? It feels
to me that this should be testable as an integration test in the
`flink-runtime` module.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]