rmetzger commented on a change in pull request #13604:
URL: https://github.com/apache/flink/pull/13604#discussion_r504915299



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
##########
@@ -107,28 +119,55 @@ public void test() throws Exception {
        }
 
        private Tuple2<String, Map<String, Object>> runAndTakeSavepoint() 
throws Exception {
-               JobClient jobClient = submitJobInitially(env(startAligned, 0, 
emptyMap()));
-               Thread.sleep(FIRST_RUN_EL_COUNT * FIRST_RUN_BACKPRESSURE_MS); 
// wait for all tasks to run and some backpressure from sink
-               Future<Map<String, Object>> accFuture = 
jobClient.getAccumulators();
-               Future<String> savepointFuture = 
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
-               return new Tuple2<>(savepointFuture.get(), accFuture.get());
+               return withCluster(new Configuration(), miniCluster -> {
+                       JobClient jobClient = 
submitJobInitially(env(startAligned, 0));
+                       waitForAllTaskRunning(miniCluster, 
jobClient.getJobID(), Deadline.fromNow(Duration.of(30, ChronoUnit.SECONDS)));
+                       Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for 
some backpressure from sink

Review comment:
       Maybe such utilities should also be made available to the user, for 
writing their own tests.
   There are a lot of very useful utilities everywhere in our tests, but it's 
difficult to discover them. Ideally we have them categorized and standardized 
somewhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to