[ 
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427967#comment-15427967
 ] 

ASF GitHub Bot commented on FLINK-4273:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75459375
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
                        sysoutLogUpdates);
     
                ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
    -           
    +
    +           Future<Object> submissionFuture = Patterns.ask(
    +                           jobClientActor,
    +                           new 
JobClientMessages.SubmitJobAndWait(jobGraph),
    +                           new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +           return new JobListeningContext(
    +                           jobGraph.getJobID(),
    +                           submissionFuture,
    +                           jobClientActor,
    +                           classLoader);
    +   }
    +
    +
    +   /**
    +    * Attaches to a running Job using the JobID.
    +    * Reconstructs the user class loader by downloading the jars from the 
JobManager.
    +    * @throws JobRetrievalException if anything goes wrong while 
retrieving the job
    +    */
    +   public static JobListeningContext attachToRunningJob(
    +                   JobID jobID,
    +                   ActorGateway jobManagerGateWay,
    +                   Configuration configuration,
    +                   ActorSystem actorSystem,
    +                   LeaderRetrievalService leaderRetrievalService,
    +                   FiniteDuration timeout,
    +                   boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +           checkNotNull(jobID, "The jobID must not be null.");
    +           checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
    +           checkNotNull(configuration, "The configuration must not be 
null.");
    +           checkNotNull(actorSystem, "The actorSystem must not be null.");
    +           checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
    +           checkNotNull(timeout, "The timeout must not be null.");
    +
    +           // retrieve classloader first before doing anything
    +           ClassLoader classloader;
    +           try {
    +                   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    True, this code assumes that the JobManager doesn't change between 
retrieving the leading jobmanager and retrieving the class loader. There is 
always some possible gap where the jobmanager could change. We could mitigate 
this by retrying in case is has changed.


> Refactor JobClientActor to watch already submitted jobs 
> --------------------------------------------------------
>
>                 Key: FLINK-4273
>                 URL: https://issues.apache.org/jira/browse/FLINK-4273
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for 
> the result. This process should be broken up into a submission process and a 
> waiting process which can both be entered independently. This leads to two 
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to