Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2524#discussion_r80441549
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobClient/JobClientUtils.java
 ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobClient;
    +
    +import akka.actor.ActorSystem;
    +import akka.actor.Address;
    +import akka.util.Timeout;
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.akka.ListeningBehaviour;
    +import org.apache.flink.runtime.blob.BlobCache;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.client.JobExecutionException;
    +import org.apache.flink.runtime.client.JobRetrievalException;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.util.NetUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.List;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * JobClientUtils is a utility for client.
    + * It offers the following methods:
    + * <ul>
    + *     <li>{@link #startJobClientRpcService(Configuration)} Starts a rpc 
service for client</li>
    + *     <li>{@link #retrieveRunningJobResult(JobID, JobMasterGateway, 
RpcService, LeaderRetrievalService, boolean, FiniteDuration, Configuration)}
    + *          Attaches to a running Job using the JobID, and wait for its 
job result</li>
    + *     <li>{@link #awaitJobResult(JobInfoTracker, ClassLoader)} Awaits the 
result of the job execution which jobInfoTracker listen for</li>
    + *     <li>{@link #retrieveClassLoader(JobID, JobMasterGateway, 
Configuration)} Reconstructs the class loader by first requesting information 
about it at the JobMaster
    + *          and then downloading missing jar files</li>
    + * </ul>
    + */
    +public class JobClientUtils {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(JobClientUtils.class);
    +
    +
    +   /**
    +    * Starts a rpc service for client
    +    *
    +    * @param config the flink configuration
    +    * @return
    +    * @throws IOException
    +    */
    +   public static RpcService startJobClientRpcService(Configuration config)
    +           throws IOException
    +   {
    +           LOG.info("Starting JobClientUtils rpc service");
    +           Option<Tuple2<String, Object>> remoting = new Some<>(new 
Tuple2<String, Object>("", 0));
    +
    +           // start a remote actor system to listen on an arbitrary port
    +           ActorSystem system = AkkaUtils.createActorSystem(config, 
remoting);
    +           Address address = system.provider().getDefaultAddress();
    +
    +           String hostAddress = address.host().isDefined() ?
    +                   
NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
    +                   "(unknown)";
    +           int port = address.port().isDefined() ? ((Integer) 
address.port().get()) : -1;
    +           LOG.info("Started JobClientUtils actor system at " + 
hostAddress + ':' + port);
    +
    +           Timeout timeout = new 
Timeout(AkkaUtils.getClientTimeout(config));
    +           return new AkkaRpcService(system, timeout);
    +   }
    +
    +   /**
    +    * Attaches to a running Job using the JobID, and wait for its job 
result
    +    *
    +    * @param jobID                  id of job
    +    * @param jobMasterGateway       gateway to the JobMaster
    +    * @param rpcService
    +    * @param leaderRetrievalService leader retriever service of jobMaster
    +    * @param sysoutLogUpdates       whether status messages shall be 
printed to sysout
    +    * @param timeout                register timeout
    +    * @param configuration          the flink configuration
    +    * @return
    +    * @throws JobExecutionException
    +    */
    +   public static JobExecutionResult retrieveRunningJobResult(
    +           JobID jobID,
    +           JobMasterGateway jobMasterGateway,
    +           RpcService rpcService,
    +           LeaderRetrievalService leaderRetrievalService,
    +           boolean sysoutLogUpdates,
    +           FiniteDuration timeout,
    +           Configuration configuration) throws JobExecutionException
    +   {
    +
    +           checkNotNull(jobID, "The jobID must not be null.");
    +           checkNotNull(jobMasterGateway, "The jobMasterGateway must not 
be null.");
    +           checkNotNull(rpcService, "The rpcService must not be null.");
    +           checkNotNull(leaderRetrievalService, "The 
leaderRetrievalService must not be null.");
    +           checkNotNull(timeout, "The timeout must not be null");
    +           checkNotNull(configuration, "The configuration must not be 
null");
    +
    +           JobInfoTracker jobInfoTracker = null;
    +           try {
    +                   jobInfoTracker = new JobInfoTracker(rpcService, 
leaderRetrievalService, jobID, sysoutLogUpdates);
    +                   jobInfoTracker.start();
    +                   registerClientAtJobMaster(jobID, 
jobInfoTracker.getAddress(), jobMasterGateway, timeout);
    +                   ClassLoader classLoader = retrieveClassLoader(jobID, 
jobMasterGateway, configuration);
    +                   return awaitJobResult(jobInfoTracker, classLoader);
    +           } finally {
    +                   if (jobInfoTracker != null) {
    +                           jobInfoTracker.shutDown();
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Awaits the result of the job execution which jobInfoTracker listen 
for
    +    *
    +    * @param jobInfoTracker job info tracker
    +    * @param classLoader    classloader to parse the job result
    +    * @return
    +    * @throws JobExecutionException
    +    */
    +   public static JobExecutionResult awaitJobResult(JobInfoTracker 
jobInfoTracker,
    +           ClassLoader classLoader) throws JobExecutionException
    +   {
    +           try {
    +                   while (true) {
    +                           Future<JobExecutionResult> 
jobExecutionResultFuture = jobInfoTracker.getJobExecutionResult(classLoader);
    +                           try {
    +                                   JobExecutionResult jobExecutionResult = 
Await.result(jobExecutionResultFuture, new 
Timeout(AkkaUtils.INF_TIMEOUT()).duration());
    +                                   return jobExecutionResult;
    +                           } catch (TimeoutException e) {
    +                                   // ignore timeout exception, retry
    --- End diff --
    
    I'm not sure if that will cause an infinite loop in case of reoccurring 
connection timeouts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to