[
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406287#comment-16406287
]
ASF GitHub Bot commented on FLINK-8703:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5701#discussion_r175755400
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
---
@@ -194,49 +187,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws
Exception {
- // Retrieve the job manager
- Await.result(cluster.leaderGateway().future(),
DEADLINE.timeLeft());
+ ClusterClient<?> client =
miniClusterResource.getClusterClient();
+ client.setDetached(true);
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- JobSubmissionResult jobSubmissionResult =
cluster.submitJobDetached(jobGraph);
-
- StandaloneClusterClient clusterClient = new
StandaloneClusterClient(cluster.configuration());
- JobListeningContext jobListeningContext =
clusterClient.connectToJob(jobSubmissionResult.getJobID());
+ JobSubmissionResult jobSubmissionResult =
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
// try and get a job result, this will fail if the job
already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
- FiniteDuration timeout = FiniteDuration.apply(5,
TimeUnit.SECONDS);
try {
+ CompletableFuture<JobStatus> jobStatusFuture =
client.getJobStatus(jobSubmissionResult.getJobID());
- Future<Object> future = clusterClient
- .getJobManagerGateway()
-
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()),
timeout);
-
- Object result = Await.result(future, timeout);
+ JobStatus jobStatus = jobStatusFuture.get(5,
TimeUnit.SECONDS);
- if (result instanceof
JobManagerMessages.CurrentJobStatus) {
- if
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
- Object jobResult = Await.result(
-
jobListeningContext.getJobResultFuture(),
-
Duration.apply(5, TimeUnit.SECONDS));
- fail("Job failed: " +
jobResult);
- }
- }
+ assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}
Thread.sleep(100);
--- End diff --
True, you're right. Sorry I didn't look closely enough at the test case.
Forget my comments here :-)
> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---------------------------------------------------------------
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Reporter: Aljoscha Krettek
> Assignee: Chesnay Schepler
> Priority: Blocker
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)