rmetzger commented on a change in pull request #13604:
URL: https://github.com/apache/flink/pull/13604#discussion_r504756543
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
##########
@@ -107,28 +119,55 @@ public void test() throws Exception {
}
private Tuple2<String, Map<String, Object>> runAndTakeSavepoint()
throws Exception {
- JobClient jobClient = submitJobInitially(env(startAligned, 0,
emptyMap()));
- Thread.sleep(FIRST_RUN_EL_COUNT * FIRST_RUN_BACKPRESSURE_MS);
// wait for all tasks to run and some backpressure from sink
- Future<Map<String, Object>> accFuture =
jobClient.getAccumulators();
- Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
- return new Tuple2<>(savepointFuture.get(), accFuture.get());
+ return withCluster(new Configuration(), miniCluster -> {
+ JobClient jobClient =
submitJobInitially(env(startAligned, 0));
+ waitForAllTaskRunning(miniCluster,
jobClient.getJobID(), Deadline.fromNow(Duration.of(30, ChronoUnit.SECONDS)));
+ Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for
some backpressure from sink
Review comment:
It's generally an anti-pattern in tests to "just sleep for a while".
Can't you use the backpressure monitoring to wait until there is backpressure?
(or would that take longer?)
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
##########
@@ -107,28 +119,55 @@ public void test() throws Exception {
}
private Tuple2<String, Map<String, Object>> runAndTakeSavepoint()
throws Exception {
- JobClient jobClient = submitJobInitially(env(startAligned, 0,
emptyMap()));
- Thread.sleep(FIRST_RUN_EL_COUNT * FIRST_RUN_BACKPRESSURE_MS);
// wait for all tasks to run and some backpressure from sink
- Future<Map<String, Object>> accFuture =
jobClient.getAccumulators();
- Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
- return new Tuple2<>(savepointFuture.get(), accFuture.get());
+ return withCluster(new Configuration(), miniCluster -> {
+ JobClient jobClient =
submitJobInitially(env(startAligned, 0));
+ waitForAllTaskRunning(miniCluster,
jobClient.getJobID(), Deadline.fromNow(Duration.of(30, ChronoUnit.SECONDS)));
+ Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for
some backpressure from sink
+
+ Future<Map<String, Object>> accFuture =
jobClient.getAccumulators();
+ Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
+ return new Tuple2<>(savepointFuture.get(),
accFuture.get());
+ });
}
private Tuple2<String, Map<String, Object>>
runAndTakeExternalCheckpoint() throws Exception {
File folder = tempFolder();
- JobClient jobClient =
submitJobInitially(externalCheckpointEnv(startAligned, folder, 100));
- File metadata =
waitForChild(waitForChild(waitForChild(folder))); // structure:
root/attempt/checkpoint/_metadata
- cancelJob(jobClient);
- return new Tuple2<>(metadata.getParentFile().toString(),
emptyMap());
+ final Configuration conf = new Configuration();
+ conf.set(CHECKPOINTS_DIRECTORY, folder.toURI().toString());
+ // prevent deletion of checkpoint files while it's being
checked and used
+ conf.set(MAX_RETAINED_CHECKPOINTS, Integer.MAX_VALUE);
+ return withCluster(conf,
+ miniCluster -> {
+ JobClient jobClient =
submitJobInitially(externalCheckpointEnv(startAligned, 100));
+ File metadata =
waitForChild(waitForChild(waitForChild(folder))); // structure:
root/attempt/checkpoint/_metadata
+ cancelJob(jobClient);
+ return new
Tuple2<>(metadata.getParentFile().toString(), emptyMap());
+ });
}
private static JobClient submitJobInitially(StreamExecutionEnvironment
env) throws Exception {
return env.executeAsync(dag(FIRST_RUN_EL_COUNT, true,
FIRST_RUN_BACKPRESSURE_MS, env));
}
private Map<String, Object> runFromSavepoint(String path, boolean
isAligned, int totalCount) throws Exception {
- StreamExecutionEnvironment env = env(isAligned, 50,
Collections.singletonMap(SAVEPOINT_PATH, path));
- return env.execute(dag(totalCount, false, 0,
env)).getJobExecutionResult().getAllAccumulatorResults();
+ return withCluster(new Configuration(), miniCluster -> {
+ StreamExecutionEnvironment env = env(isAligned, 50);
+ final StreamGraph streamGraph = dag(totalCount, false,
0, env);
+
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(path));
+ return
env.execute(streamGraph).getJobExecutionResult().getAllAccumulatorResults();
+ });
+ }
+
+ private <T> T withCluster(Configuration configuration,
Review comment:
Why are you not reusing the cluster for the different tests / jobs?
##########
File path:
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
##########
@@ -102,6 +116,24 @@ private static void verifyJvmOptions() {
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
+ public static void waitForAllTaskRunning(MiniClusterResource
miniCluster, JobID jobID, Deadline deadline) throws Exception {
+ try (final RestClusterClient<?> clusterClient = new
RestClusterClient<Object>(
+ miniCluster.getClientConfiguration(),
+ StandaloneClusterId.getInstance())) {
+ JobMessageParameters params = new
JobMessageParameters();
+ params.jobPathParameter.resolve(jobID);
+
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
+ final JobDetailsInfo jobDetailsInfo =
clusterClient.sendRequest(
+ JobDetailsHeaders.getInstance(),
+ params,
+ EmptyRequestBody.getInstance()).get();
+ return jobDetailsInfo.getJobStatus() ==
JobStatus.RUNNING &&
+
jobDetailsInfo.getJobVerticesPerState().get(ExecutionState.RUNNING) ==
+
jobDetailsInfo.getJobVertexInfos().size();
+ }, deadline, 500);
+ }
+ }
+
Review comment:
Not sure about the code formatting, and I haven't tested it much. But I
guess you get the idea.
##########
File path:
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
##########
@@ -102,6 +116,24 @@ private static void verifyJvmOptions() {
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
+ public static void waitForAllTaskRunning(MiniClusterResource
miniCluster, JobID jobID, Deadline deadline) throws Exception {
+ try (final RestClusterClient<?> clusterClient = new
RestClusterClient<Object>(
+ miniCluster.getClientConfiguration(),
+ StandaloneClusterId.getInstance())) {
+ JobMessageParameters params = new
JobMessageParameters();
+ params.jobPathParameter.resolve(jobID);
+
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
+ final JobDetailsInfo jobDetailsInfo =
clusterClient.sendRequest(
+ JobDetailsHeaders.getInstance(),
+ params,
+ EmptyRequestBody.getInstance()).get();
+ return jobDetailsInfo.getJobStatus() ==
JobStatus.RUNNING &&
+
jobDetailsInfo.getJobVerticesPerState().get(ExecutionState.RUNNING) ==
+
jobDetailsInfo.getJobVertexInfos().size();
+ }, deadline, 500);
+ }
+ }
+
Review comment:
```suggestion
public static void waitForAllTaskRunning(MiniCluster miniCluster, JobID
jobID, Deadline deadline) throws Exception {
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
AccessExecutionGraph ec =
miniCluster.getExecutionGraph(jobID).get();
return ec.getState() == JobStatus.RUNNING &&
ec.getAllVertices()
.values()
.stream()
.allMatch(jobVertex ->
Arrays.stream(jobVertex.getTaskVertices()).allMatch(task ->
task.getExecutionState() == ExecutionState.RUNNING)
);
}, deadline);
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]