[ 
https://issues.apache.org/jira/browse/FLINK-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15522610#comment-15522610
 ] 

ASF GitHub Bot commented on FLINK-4653:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2524#discussion_r80441549
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobClient/JobClientUtils.java
 ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobClient;
    +
    +import akka.actor.ActorSystem;
    +import akka.actor.Address;
    +import akka.util.Timeout;
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.akka.ListeningBehaviour;
    +import org.apache.flink.runtime.blob.BlobCache;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.client.JobExecutionException;
    +import org.apache.flink.runtime.client.JobRetrievalException;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.util.NetUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.List;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * JobClientUtils is a utility for client.
    + * It offers the following methods:
    + * <ul>
    + *     <li>{@link #startJobClientRpcService(Configuration)} Starts a rpc 
service for client</li>
    + *     <li>{@link #retrieveRunningJobResult(JobID, JobMasterGateway, 
RpcService, LeaderRetrievalService, boolean, FiniteDuration, Configuration)}
    + *          Attaches to a running Job using the JobID, and wait for its 
job result</li>
    + *     <li>{@link #awaitJobResult(JobInfoTracker, ClassLoader)} Awaits the 
result of the job execution which jobInfoTracker listen for</li>
    + *     <li>{@link #retrieveClassLoader(JobID, JobMasterGateway, 
Configuration)} Reconstructs the class loader by first requesting information 
about it at the JobMaster
    + *          and then downloading missing jar files</li>
    + * </ul>
    + */
    +public class JobClientUtils {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(JobClientUtils.class);
    +
    +
    +   /**
    +    * Starts a rpc service for client
    +    *
    +    * @param config the flink configuration
    +    * @return
    +    * @throws IOException
    +    */
    +   public static RpcService startJobClientRpcService(Configuration config)
    +           throws IOException
    +   {
    +           LOG.info("Starting JobClientUtils rpc service");
    +           Option<Tuple2<String, Object>> remoting = new Some<>(new 
Tuple2<String, Object>("", 0));
    +
    +           // start a remote actor system to listen on an arbitrary port
    +           ActorSystem system = AkkaUtils.createActorSystem(config, 
remoting);
    +           Address address = system.provider().getDefaultAddress();
    +
    +           String hostAddress = address.host().isDefined() ?
    +                   
NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
    +                   "(unknown)";
    +           int port = address.port().isDefined() ? ((Integer) 
address.port().get()) : -1;
    +           LOG.info("Started JobClientUtils actor system at " + 
hostAddress + ':' + port);
    +
    +           Timeout timeout = new 
Timeout(AkkaUtils.getClientTimeout(config));
    +           return new AkkaRpcService(system, timeout);
    +   }
    +
    +   /**
    +    * Attaches to a running Job using the JobID, and wait for its job 
result
    +    *
    +    * @param jobID                  id of job
    +    * @param jobMasterGateway       gateway to the JobMaster
    +    * @param rpcService
    +    * @param leaderRetrievalService leader retriever service of jobMaster
    +    * @param sysoutLogUpdates       whether status messages shall be 
printed to sysout
    +    * @param timeout                register timeout
    +    * @param configuration          the flink configuration
    +    * @return
    +    * @throws JobExecutionException
    +    */
    +   public static JobExecutionResult retrieveRunningJobResult(
    +           JobID jobID,
    +           JobMasterGateway jobMasterGateway,
    +           RpcService rpcService,
    +           LeaderRetrievalService leaderRetrievalService,
    +           boolean sysoutLogUpdates,
    +           FiniteDuration timeout,
    +           Configuration configuration) throws JobExecutionException
    +   {
    +
    +           checkNotNull(jobID, "The jobID must not be null.");
    +           checkNotNull(jobMasterGateway, "The jobMasterGateway must not 
be null.");
    +           checkNotNull(rpcService, "The rpcService must not be null.");
    +           checkNotNull(leaderRetrievalService, "The 
leaderRetrievalService must not be null.");
    +           checkNotNull(timeout, "The timeout must not be null");
    +           checkNotNull(configuration, "The configuration must not be 
null");
    +
    +           JobInfoTracker jobInfoTracker = null;
    +           try {
    +                   jobInfoTracker = new JobInfoTracker(rpcService, 
leaderRetrievalService, jobID, sysoutLogUpdates);
    +                   jobInfoTracker.start();
    +                   registerClientAtJobMaster(jobID, 
jobInfoTracker.getAddress(), jobMasterGateway, timeout);
    +                   ClassLoader classLoader = retrieveClassLoader(jobID, 
jobMasterGateway, configuration);
    +                   return awaitJobResult(jobInfoTracker, classLoader);
    +           } finally {
    +                   if (jobInfoTracker != null) {
    +                           jobInfoTracker.shutDown();
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Awaits the result of the job execution which jobInfoTracker listen 
for
    +    *
    +    * @param jobInfoTracker job info tracker
    +    * @param classLoader    classloader to parse the job result
    +    * @return
    +    * @throws JobExecutionException
    +    */
    +   public static JobExecutionResult awaitJobResult(JobInfoTracker 
jobInfoTracker,
    +           ClassLoader classLoader) throws JobExecutionException
    +   {
    +           try {
    +                   while (true) {
    +                           Future<JobExecutionResult> 
jobExecutionResultFuture = jobInfoTracker.getJobExecutionResult(classLoader);
    +                           try {
    +                                   JobExecutionResult jobExecutionResult = 
Await.result(jobExecutionResultFuture, new 
Timeout(AkkaUtils.INF_TIMEOUT()).duration());
    +                                   return jobExecutionResult;
    +                           } catch (TimeoutException e) {
    +                                   // ignore timeout exception, retry
    --- End diff --
    
    I'm not sure if that will cause an infinite loop in case of reoccurring 
connection timeouts.


> Refactor JobClientActor to adapt to the new Rpc framework and new cluster 
> managerment
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-4653
>                 URL: https://issues.apache.org/jira/browse/FLINK-4653
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: zhangjing
>            Assignee: zhangjing
>             Fix For: 1.2.0
>
>
> 1. Create a RpcEndpoint(temporary named JobInfoTracker) and 
> RpcGateway(temporary named JobInfoTrackerGateway) to replace the old 
> JobClientActor. 
> 2. Change rpc message communication in JobClientActor to rpc method call to 
> apply to the new rpc framework. 
> 3. JobInfoTracker is responsible for waiting for the jobStateChange and 
> jobResult util job complete. But it is not responsible for submitting new job 
> because jobSubmission behavior is different in different cluster



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to