AHeise commented on a change in pull request #15589:
URL: https://github.com/apache/flink/pull/15589#discussion_r613873168
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
##########
@@ -97,6 +100,19 @@
private JobGraph jobGraph;
+ @After
+ public void tearDown() throws Exception {
+ for (JobStatusMessage path :
miniClusterResource.getClusterClient().listJobs().get()) {
+ if (!path.getJobState().isTerminalState()) {
+ try {
+
miniClusterResource.getClusterClient().cancel(path.getJobId()).get();
+ } catch (Exception ignored) {
+ // ignore exceptions when cancelling dangling jobs
+ }
+ }
+ }
+ }
+
Review comment:
We should move this inside the resource.
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
##########
@@ -242,6 +258,27 @@ public void
testRestartCheckpointCoordinatorIfStopWithSavepointFails() throws Ex
assertTrue(checkpointsToWaitFor.await(60L, TimeUnit.SECONDS));
}
+ private void stopWithSavepointAndWait(boolean terminate)
+ throws InterruptedException, ExecutionException {
+ final int attempts = 15;
+ for (int i = 0; i < attempts; i++) {
+ try {
+ stopWithSavepoint(terminate).get();
+ return;
+ } catch (Exception ex) {
+ Optional<CheckpointException> checkpointExceptionOptional =
+ ExceptionUtils.findThrowable(ex,
CheckpointException.class);
+ if (!checkpointExceptionOptional.isPresent()
+ ||
checkpointExceptionOptional.get().getCheckpointFailureReason()
+ != NOT_ALL_REQUIRED_TASKS_RUNNING) {
+ throw ex;
+ }
+ // back off before triggering savepoint again
+ Thread.sleep(100);
+ }
+ }
+ }
+
Review comment:
When do we need so many tries? Why wouldn't it work if we know that
everything is running on the first try?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]