AHeise commented on a change in pull request #13604:
URL: https://github.com/apache/flink/pull/13604#discussion_r504911666
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
##########
@@ -107,28 +119,55 @@ public void test() throws Exception {
}
private Tuple2<String, Map<String, Object>> runAndTakeSavepoint()
throws Exception {
- JobClient jobClient = submitJobInitially(env(startAligned, 0,
emptyMap()));
- Thread.sleep(FIRST_RUN_EL_COUNT * FIRST_RUN_BACKPRESSURE_MS);
// wait for all tasks to run and some backpressure from sink
- Future<Map<String, Object>> accFuture =
jobClient.getAccumulators();
- Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
- return new Tuple2<>(savepointFuture.get(), accFuture.get());
+ return withCluster(new Configuration(), miniCluster -> {
+ JobClient jobClient =
submitJobInitially(env(startAligned, 0));
+ waitForAllTaskRunning(miniCluster,
jobClient.getJobID(), Deadline.fromNow(Duration.of(30, ChronoUnit.SECONDS)));
+ Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for
some backpressure from sink
+
+ Future<Map<String, Object>> accFuture =
jobClient.getAccumulators();
+ Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
+ return new Tuple2<>(savepointFuture.get(),
accFuture.get());
+ });
}
private Tuple2<String, Map<String, Object>>
runAndTakeExternalCheckpoint() throws Exception {
File folder = tempFolder();
- JobClient jobClient =
submitJobInitially(externalCheckpointEnv(startAligned, folder, 100));
- File metadata =
waitForChild(waitForChild(waitForChild(folder))); // structure:
root/attempt/checkpoint/_metadata
- cancelJob(jobClient);
- return new Tuple2<>(metadata.getParentFile().toString(),
emptyMap());
+ final Configuration conf = new Configuration();
+ conf.set(CHECKPOINTS_DIRECTORY, folder.toURI().toString());
+ // prevent deletion of checkpoint files while it's being
checked and used
+ conf.set(MAX_RETAINED_CHECKPOINTS, Integer.MAX_VALUE);
+ return withCluster(conf,
+ miniCluster -> {
+ JobClient jobClient =
submitJobInitially(externalCheckpointEnv(startAligned, 100));
+ File metadata =
waitForChild(waitForChild(waitForChild(folder))); // structure:
root/attempt/checkpoint/_metadata
+ cancelJob(jobClient);
+ return new
Tuple2<>(metadata.getParentFile().toString(), emptyMap());
+ });
}
private static JobClient submitJobInitially(StreamExecutionEnvironment
env) throws Exception {
return env.executeAsync(dag(FIRST_RUN_EL_COUNT, true,
FIRST_RUN_BACKPRESSURE_MS, env));
}
private Map<String, Object> runFromSavepoint(String path, boolean
isAligned, int totalCount) throws Exception {
- StreamExecutionEnvironment env = env(isAligned, 50,
Collections.singletonMap(SAVEPOINT_PATH, path));
- return env.execute(dag(totalCount, false, 0,
env)).getJobExecutionResult().getAllAccumulatorResults();
+ return withCluster(new Configuration(), miniCluster -> {
+ StreamExecutionEnvironment env = env(isAligned, 50);
+ final StreamGraph streamGraph = dag(totalCount, false,
0, env);
+
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(path));
+ return
env.execute(streamGraph).getJobExecutionResult().getAllAccumulatorResults();
+ });
+ }
+
+ private <T> T withCluster(Configuration configuration,
Review comment:
Good point. Originally, I needed to inject the configuration to restore
from savepoints, but that didn't work. So I can probably leave a cluster per
test. A global cluster would probably also work, I just need to make the
TempFolder a class rule.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
##########
@@ -107,28 +119,55 @@ public void test() throws Exception {
}
private Tuple2<String, Map<String, Object>> runAndTakeSavepoint()
throws Exception {
- JobClient jobClient = submitJobInitially(env(startAligned, 0,
emptyMap()));
- Thread.sleep(FIRST_RUN_EL_COUNT * FIRST_RUN_BACKPRESSURE_MS);
// wait for all tasks to run and some backpressure from sink
- Future<Map<String, Object>> accFuture =
jobClient.getAccumulators();
- Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
- return new Tuple2<>(savepointFuture.get(), accFuture.get());
+ return withCluster(new Configuration(), miniCluster -> {
+ JobClient jobClient =
submitJobInitially(env(startAligned, 0));
+ waitForAllTaskRunning(miniCluster,
jobClient.getJobID(), Deadline.fromNow(Duration.of(30, ChronoUnit.SECONDS)));
+ Thread.sleep(FIRST_RUN_BACKPRESSURE_MS); // wait for
some backpressure from sink
+
+ Future<Map<String, Object>> accFuture =
jobClient.getAccumulators();
+ Future<String> savepointFuture =
jobClient.stopWithSavepoint(false, tempFolder().toURI().toString());
+ return new Tuple2<>(savepointFuture.get(),
accFuture.get());
+ });
}
private Tuple2<String, Map<String, Object>>
runAndTakeExternalCheckpoint() throws Exception {
File folder = tempFolder();
- JobClient jobClient =
submitJobInitially(externalCheckpointEnv(startAligned, folder, 100));
- File metadata =
waitForChild(waitForChild(waitForChild(folder))); // structure:
root/attempt/checkpoint/_metadata
- cancelJob(jobClient);
- return new Tuple2<>(metadata.getParentFile().toString(),
emptyMap());
+ final Configuration conf = new Configuration();
+ conf.set(CHECKPOINTS_DIRECTORY, folder.toURI().toString());
+ // prevent deletion of checkpoint files while it's being
checked and used
+ conf.set(MAX_RETAINED_CHECKPOINTS, Integer.MAX_VALUE);
+ return withCluster(conf,
+ miniCluster -> {
+ JobClient jobClient =
submitJobInitially(externalCheckpointEnv(startAligned, 100));
+ File metadata =
waitForChild(waitForChild(waitForChild(folder))); // structure:
root/attempt/checkpoint/_metadata
+ cancelJob(jobClient);
+ return new
Tuple2<>(metadata.getParentFile().toString(), emptyMap());
+ });
}
private static JobClient submitJobInitially(StreamExecutionEnvironment
env) throws Exception {
return env.executeAsync(dag(FIRST_RUN_EL_COUNT, true,
FIRST_RUN_BACKPRESSURE_MS, env));
}
private Map<String, Object> runFromSavepoint(String path, boolean
isAligned, int totalCount) throws Exception {
- StreamExecutionEnvironment env = env(isAligned, 50,
Collections.singletonMap(SAVEPOINT_PATH, path));
- return env.execute(dag(totalCount, false, 0,
env)).getJobExecutionResult().getAllAccumulatorResults();
+ return withCluster(new Configuration(), miniCluster -> {
+ StreamExecutionEnvironment env = env(isAligned, 50);
+ final StreamGraph streamGraph = dag(totalCount, false,
0, env);
+
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(path));
+ return
env.execute(streamGraph).getJobExecutionResult().getAllAccumulatorResults();
+ });
+ }
+
+ private <T> T withCluster(Configuration configuration,
Review comment:
Good point. Originally, I needed to inject the configuration to restore
from savepoints, but that didn't work. So I can probably leave a cluster per
test. A global cluster would probably also work, I just need to make the
TempFolder a class rule.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]