[
https://issues.apache.org/jira/browse/FLINK-8956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410993#comment-16410993
]
ASF GitHub Bot commented on FLINK-8956:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5715#discussion_r176663967
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
---
@@ -528,54 +454,44 @@ public void
testSavepointRescalingPartitionedOperatorState(boolean scaleOut, Ope
}
try {
- jobManager =
cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph =
createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
- jobID = jobGraph.getJobID();
-
- cluster.submitJobDetached(jobGraph);
+ final JobID jobID = jobGraph.getJobID();
- Object savepointResponse = null;
+ client.setDetached(true);
+ client.submitJob(jobGraph,
RescalingITCase.class.getClassLoader());
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
- while (deadline.hasTimeLeft()) {
-
- Future<Object> savepointPathFuture =
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID,
Option.<String>empty()), deadline.timeLeft());
- FiniteDuration waitingTime = new
FiniteDuration(10, TimeUnit.SECONDS);
- savepointResponse =
Await.result(savepointPathFuture, waitingTime);
-
- if (savepointResponse instanceof
JobManagerMessages.TriggerSavepointSuccess) {
- break;
- }
- }
-
- assertTrue(savepointResponse instanceof
JobManagerMessages.TriggerSavepointSuccess);
-
- final String savepointPath =
((JobManagerMessages.TriggerSavepointSuccess)
savepointResponse).savepointPath();
-
- Future<Object> jobRemovedFuture = jobManager.ask(new
TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
- Future<Object> cancellationResponseFuture =
jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
- Object cancellationResponse =
Await.result(cancellationResponseFuture, deadline.timeLeft());
+ CompletableFuture<String> savepointPathFuture =
FutureUtils.retryWithDelay(
+ () -> {
+ try {
+ return
client.triggerSavepoint(jobID, null);
+ } catch (FlinkException e) {
+ throw new RuntimeException(e);
--- End diff --
yes, I'll change it and merge afterwards.
> Port RescalingITCase to flip6
> -----------------------------
>
> Key: FLINK-8956
> URL: https://issues.apache.org/jira/browse/FLINK-8956
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Blocker
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)