rmetzger commented on a change in pull request #13604:
URL: https://github.com/apache/flink/pull/13604#discussion_r504756543



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
##########
@@ -107,28 +119,55 @@ public void test() throws Exception {
        }
 
        private Tuple2<String, Map<String, Object>> runAndTakeSavepoint() 
throws Exception {
-               JobClient jobClient = submitJobInitially(env(startAligned, 0, 
emptyMap()));
-               Thread.sleep(FIRST_RUN_EL_COUNT * FIRST_RUN_BACKPRESSURE_MS); 
// wait for all tasks to run and some backpressure from sink
-               Future<Map<String, Object>> accFuture = 
jobClient.getAccumulators();
-               Future<String> savepointFuture = 
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
-               return new Tuple2<>(savepointFuture.get(), accFuture.get());
+               return withCluster(new Configuration(), miniCluster -> {
+                       JobClient jobClient = 
submitJobInitially(env(startAligned, 0));
+                       waitForAllTaskRunning(miniCluster, 
jobClient.getJobID(), Deadline.fromNow(Duration.of(30, ChronoUnit.SECONDS)));
+                       Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for 
some backpressure from sink

Review comment:
       It's generally an anti-pattern in tests to "just sleep for a while". 
Can't you use the backpressure monitoring to wait until there is backpressure? 
(or would that take longer?)

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
##########
@@ -107,28 +119,55 @@ public void test() throws Exception {
        }
 
        private Tuple2<String, Map<String, Object>> runAndTakeSavepoint() 
throws Exception {
-               JobClient jobClient = submitJobInitially(env(startAligned, 0, 
emptyMap()));
-               Thread.sleep(FIRST_RUN_EL_COUNT * FIRST_RUN_BACKPRESSURE_MS); 
// wait for all tasks to run and some backpressure from sink
-               Future<Map<String, Object>> accFuture = 
jobClient.getAccumulators();
-               Future<String> savepointFuture = 
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
-               return new Tuple2<>(savepointFuture.get(), accFuture.get());
+               return withCluster(new Configuration(), miniCluster -> {
+                       JobClient jobClient = 
submitJobInitially(env(startAligned, 0));
+                       waitForAllTaskRunning(miniCluster, 
jobClient.getJobID(), Deadline.fromNow(Duration.of(30, ChronoUnit.SECONDS)));
+                       Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for 
some backpressure from sink
+
+                       Future<Map<String, Object>> accFuture = 
jobClient.getAccumulators();
+                       Future<String> savepointFuture = 
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
+                       return new Tuple2<>(savepointFuture.get(), 
accFuture.get());
+               });
        }
 
        private Tuple2<String, Map<String, Object>> 
runAndTakeExternalCheckpoint() throws Exception {
                File folder = tempFolder();
-               JobClient jobClient = 
submitJobInitially(externalCheckpointEnv(startAligned, folder, 100));
-               File metadata = 
waitForChild(waitForChild(waitForChild(folder))); // structure: 
root/attempt/checkpoint/_metadata
-               cancelJob(jobClient);
-               return new Tuple2<>(metadata.getParentFile().toString(), 
emptyMap());
+               final Configuration conf = new Configuration();
+               conf.set(CHECKPOINTS_DIRECTORY, folder.toURI().toString());
+               // prevent deletion of checkpoint files while it's being 
checked and used
+               conf.set(MAX_RETAINED_CHECKPOINTS, Integer.MAX_VALUE);
+               return withCluster(conf,
+                       miniCluster -> {
+                               JobClient jobClient = 
submitJobInitially(externalCheckpointEnv(startAligned, 100));
+                               File metadata = 
waitForChild(waitForChild(waitForChild(folder))); // structure: 
root/attempt/checkpoint/_metadata
+                               cancelJob(jobClient);
+                               return new 
Tuple2<>(metadata.getParentFile().toString(), emptyMap());
+                       });
        }
 
        private static JobClient submitJobInitially(StreamExecutionEnvironment 
env) throws Exception {
                return env.executeAsync(dag(FIRST_RUN_EL_COUNT, true, 
FIRST_RUN_BACKPRESSURE_MS, env));
        }
 
        private Map<String, Object> runFromSavepoint(String path, boolean 
isAligned, int totalCount) throws Exception {
-               StreamExecutionEnvironment env = env(isAligned, 50, 
Collections.singletonMap(SAVEPOINT_PATH, path));
-               return env.execute(dag(totalCount, false, 0, 
env)).getJobExecutionResult().getAllAccumulatorResults();
+               return withCluster(new Configuration(), miniCluster -> {
+                       StreamExecutionEnvironment env = env(isAligned, 50);
+                       final StreamGraph streamGraph = dag(totalCount, false, 
0, env);
+                       
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(path));
+                       return 
env.execute(streamGraph).getJobExecutionResult().getAllAccumulatorResults();
+               });
+       }
+
+       private <T> T withCluster(Configuration configuration,

Review comment:
       Why are you not reusing the cluster for the different tests / jobs?

##########
File path: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
##########
@@ -102,6 +116,24 @@ private static void verifyJvmOptions() {
                                + "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
        }
 
+       public static void waitForAllTaskRunning(MiniClusterResource 
miniCluster, JobID jobID, Deadline deadline) throws Exception {
+               try (final RestClusterClient<?> clusterClient = new 
RestClusterClient<Object>(
+                               miniCluster.getClientConfiguration(),
+                               StandaloneClusterId.getInstance())) {
+                       JobMessageParameters params = new 
JobMessageParameters();
+                       params.jobPathParameter.resolve(jobID);
+                       
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
+                               final JobDetailsInfo jobDetailsInfo = 
clusterClient.sendRequest(
+                                       JobDetailsHeaders.getInstance(),
+                                       params,
+                                       EmptyRequestBody.getInstance()).get();
+                               return jobDetailsInfo.getJobStatus() == 
JobStatus.RUNNING &&
+                                       
jobDetailsInfo.getJobVerticesPerState().get(ExecutionState.RUNNING) ==
+                                               
jobDetailsInfo.getJobVertexInfos().size();
+                       }, deadline, 500);
+               }
+       }
+

Review comment:
       Not sure about the code formatting, and I haven't tested it much. But I 
guess you get the idea.

##########
File path: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
##########
@@ -102,6 +116,24 @@ private static void verifyJvmOptions() {
                                + "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
        }
 
+       public static void waitForAllTaskRunning(MiniClusterResource 
miniCluster, JobID jobID, Deadline deadline) throws Exception {
+               try (final RestClusterClient<?> clusterClient = new 
RestClusterClient<Object>(
+                               miniCluster.getClientConfiguration(),
+                               StandaloneClusterId.getInstance())) {
+                       JobMessageParameters params = new 
JobMessageParameters();
+                       params.jobPathParameter.resolve(jobID);
+                       
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
+                               final JobDetailsInfo jobDetailsInfo = 
clusterClient.sendRequest(
+                                       JobDetailsHeaders.getInstance(),
+                                       params,
+                                       EmptyRequestBody.getInstance()).get();
+                               return jobDetailsInfo.getJobStatus() == 
JobStatus.RUNNING &&
+                                       
jobDetailsInfo.getJobVerticesPerState().get(ExecutionState.RUNNING) ==
+                                               
jobDetailsInfo.getJobVertexInfos().size();
+                       }, deadline, 500);
+               }
+       }
+

Review comment:
       ```suggestion
        public static void waitForAllTaskRunning(MiniCluster miniCluster, JobID 
jobID, Deadline deadline) throws Exception {
                
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
                        AccessExecutionGraph ec = 
miniCluster.getExecutionGraph(jobID).get();
                        return ec.getState() == JobStatus.RUNNING && 
ec.getAllVertices()
                                .values()
                                .stream()
                                .allMatch(jobVertex ->
                                        
Arrays.stream(jobVertex.getTaskVertices()).allMatch(task -> 
task.getExecutionState() == ExecutionState.RUNNING)
                                );
                }, deadline);
        }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to