[ 
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433140#comment-15433140
 ] 

ASF GitHub Bot commented on FLINK-4273:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75902171
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -243,46 +247,46 @@ public static JobExecutionResult 
awaitJobResult(JobListeningContext listeningCon
                final JobID jobID = listeningContext.getJobID();
                final ActorRef jobClientActor = 
listeningContext.getJobClientActor();
                final Future<Object> jobSubmissionFuture = 
listeningContext.getJobResultFuture();
    +           final FiniteDuration askTimeout = listeningContext.getTimeout();
                // retrieves class loader if necessary
                final ClassLoader classLoader = 
listeningContext.getClassLoader();
     
    +           // wait for the future which holds the result to be ready
    +           // ping the JobClientActor from time to time to check if it is 
still running
                while (!jobSubmissionFuture.isCompleted()) {
                        try {
    -                           Thread.sleep(250);
    -                   } catch (InterruptedException e) {
    -                           throw new JobExecutionException(jobID, 
"Interrupted while waiting for execution result.", e);
    -                   }
    -
    -                   try {
    -                           Await.result(
    -                                   Patterns.ask(
    -                                           jobClientActor,
    -                                           JobClientMessages.getPing(),
    -                                           
Timeout.durationToTimeout(AkkaUtils.getDefaultTimeout())),
    -                                   AkkaUtils.getDefaultTimeout());
    +                           Await.ready(jobSubmissionFuture, askTimeout);
                        } catch (Exception e) {
    --- End diff --
    
    We throw the exception anyways afterwards. The only difference is that we 
wrap the exception and throw only if the future has not been completed in the 
meantime.
    
    We would have to catch `InterruptedException`, `TimeoutException`, and 
`IllegalArgumentException`. I'm not convinced this is necessary.


> Refactor JobClientActor to watch already submitted jobs 
> --------------------------------------------------------
>
>                 Key: FLINK-4273
>                 URL: https://issues.apache.org/jira/browse/FLINK-4273
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for 
> the result. This process should be broken up into a submission process and a 
> waiting process which can both be entered independently. This leads to two 
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to