[
https://issues.apache.org/jira/browse/FLINK-8778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376764#comment-16376764
]
ASF GitHub Bot commented on FLINK-8778:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5579#discussion_r170572908
--- Diff:
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
---
@@ -304,47 +294,17 @@ public Integer getKey(Tuple2<Integer, Long> value) {
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();
- final CompletableFuture<TestingJobManagerMessages.JobStatusIs>
failedFuture =
- notifyWhenJobStatusIs(jobId, JobStatus.FAILED,
deadline);
-
- final CompletableFuture<TestingJobManagerMessages.JobStatusIs>
cancellationFuture =
- notifyWhenJobStatusIs(jobId,
JobStatus.CANCELED, deadline);
-
- cluster.submitJobDetached(jobGraph);
-
- try {
- final TestingJobManagerMessages.JobStatusIs jobStatus =
-
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- assertEquals(JobStatus.FAILED, jobStatus.state());
- } catch (Exception e) {
-
- // if the assertion fails, it means that the job was
(falsely) not cancelled.
- // in this case, and given that the mini-cluster is
shared with other tests,
- // we cancel the job and wait for the cancellation so
that the resources are freed.
-
- if (jobId != null) {
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- }
+ clusterClient.submitJob(jobGraph,
AbstractQueryableStateTestBase.class.getClassLoader());
- // and we re-throw the exception.
- throw e;
- }
+ CompletableFuture<JobResult> jobResultFuture =
clusterClient.requestJobResult(jobId);
- // Get the job and check the cause
- JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
- cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
- .get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
+ JobResult jobResult = jobResultFuture.get();
+ assertFalse(jobResult.isSuccess());
- String failureCause =
jobFound.executionGraph().getFailureInfo().getExceptionAsString();
+ String failureCause =
jobResult.getSerializedThrowable().get().getFullStringifiedStackTrace();
- assertEquals(JobStatus.FAILED,
jobFound.executionGraph().getState());
+ CompletableFuture<JobStatus> jobStatusFuture =
clusterClient.getJobStatus(jobId);
+ assertEquals(JobStatus.FAILED, jobStatusFuture.get());
--- End diff --
add timeout
> Migrate queryable state ITCases to use MiniClusterResource
> ----------------------------------------------------------
>
> Key: FLINK-8778
> URL: https://issues.apache.org/jira/browse/FLINK-8778
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)