[ 
https://issues.apache.org/jira/browse/FLINK-8778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376766#comment-16376766
 ] 

ASF GitHub Bot commented on FLINK-8778:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5579#discussion_r170572839
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
    @@ -304,47 +294,17 @@ public Integer getKey(Tuple2<Integer, Long> value) {
                final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
                final JobID jobId = jobGraph.getJobID();
     
    -           final CompletableFuture<TestingJobManagerMessages.JobStatusIs> 
failedFuture =
    -                           notifyWhenJobStatusIs(jobId, JobStatus.FAILED, 
deadline);
    -
    -           final CompletableFuture<TestingJobManagerMessages.JobStatusIs> 
cancellationFuture =
    -                           notifyWhenJobStatusIs(jobId, 
JobStatus.CANCELED, deadline);
    -
    -           cluster.submitJobDetached(jobGraph);
    -
    -           try {
    -                   final TestingJobManagerMessages.JobStatusIs jobStatus =
    -                                   
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    -
    -                   assertEquals(JobStatus.FAILED, jobStatus.state());
    -           } catch (Exception e) {
    -
    -                   // if the assertion fails, it means that the job was 
(falsely) not cancelled.
    -                   // in this case, and given that the mini-cluster is 
shared with other tests,
    -                   // we cancel the job and wait for the cancellation so 
that the resources are freed.
    -
    -                   if (jobId != null) {
    -                           cluster.getLeaderGateway(deadline.timeLeft())
    -                                           .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
    -                                           
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
    -
    -                           
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    -                   }
    +           clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
     
    -                   // and we re-throw the exception.
    -                   throw e;
    -           }
    +           CompletableFuture<JobResult> jobResultFuture = 
clusterClient.requestJobResult(jobId);
     
    -           // Get the job and check the cause
    -           JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
    -                           cluster.getLeaderGateway(deadline.timeLeft())
    -                                           .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
    -                                           
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
    -                           .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
    +           JobResult jobResult = jobResultFuture.get();
    --- End diff --
    
    This should have a timeout. If the timeout is shit we should try to cancel 
he job.


> Migrate queryable state ITCases to use MiniClusterResource
> ----------------------------------------------------------
>
>                 Key: FLINK-8778
>                 URL: https://issues.apache.org/jira/browse/FLINK-8778
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to