Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5715#discussion_r176663967
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
---
@@ -528,54 +454,44 @@ public void
testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope
}
try {
- jobManager =
cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph =
createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
- jobID = jobGraph.getJobID();
-
- cluster.submitJobDetached(jobGraph);
+ final JobID jobID = jobGraph.getJobID();
- Object savepointResponse = null;
+ client.setDetached(true);
+ client.submitJob(jobGraph,
RescalingITCase.class.getClassLoader());
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
- while (deadline.hasTimeLeft()) {
-
- Future<Object> savepointPathFuture =
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID,
Option.<String>empty()), deadline.timeLeft());
- FiniteDuration waitingTime = new
FiniteDuration(10, TimeUnit.SECONDS);
- savepointResponse =
Await.result(savepointPathFuture, waitingTime);
-
- if (savepointResponse instanceof
JobManagerMessages.TriggerSavepointSuccess) {
- break;
- }
- }
-
- assertTrue(savepointResponse instanceof
JobManagerMessages.TriggerSavepointSuccess);
-
- final String savepointPath =
((JobManagerMessages.TriggerSavepointSuccess)
savepointResponse).savepointPath();
-
- Future<Object> jobRemovedFuture = jobManager.ask(new
TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
- Future<Object> cancellationResponseFuture =
jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
- Object cancellationResponse =
Await.result(cancellationResponseFuture, deadline.timeLeft());
+ CompletableFuture<String> savepointPathFuture =
FutureUtils.retryWithDelay(
+ () -> {
+ try {
+ return
client.triggerSavepoint(jobID, null);
+ } catch (FlinkException e) {
+ throw new RuntimeException(e);
--- End diff --
yes, I'll change it and merge afterwards.
---