Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2524#discussion_r80441549 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobClient/JobClientUtils.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobClient; + +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.util.Timeout; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * JobClientUtils is a utility for client. + * It offers the following methods: + * <ul> + * <li>{@link #startJobClientRpcService(Configuration)} Starts a rpc service for client</li> + * <li>{@link #retrieveRunningJobResult(JobID, JobMasterGateway, RpcService, LeaderRetrievalService, boolean, FiniteDuration, Configuration)} + * Attaches to a running Job using the JobID, and wait for its job result</li> + * <li>{@link #awaitJobResult(JobInfoTracker, ClassLoader)} Awaits the result of the job execution which jobInfoTracker listen for</li> + * <li>{@link #retrieveClassLoader(JobID, JobMasterGateway, Configuration)} Reconstructs the class loader by first requesting information about it at the JobMaster + * and then downloading missing jar files</li> + * </ul> + */ +public class JobClientUtils { + + private static final Logger LOG = LoggerFactory.getLogger(JobClientUtils.class); + + + /** + * Starts a rpc service for client + * + * @param config the flink configuration + * @return + * @throws IOException + */ + public static RpcService startJobClientRpcService(Configuration config) + throws IOException + { + LOG.info("Starting JobClientUtils rpc service"); + Option<Tuple2<String, Object>> remoting = new Some<>(new Tuple2<String, Object>("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem system = AkkaUtils.createActorSystem(config, remoting); + Address address = system.provider().getDefaultAddress(); + + String hostAddress = address.host().isDefined() ? + NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) : + "(unknown)"; + int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1; + LOG.info("Started JobClientUtils actor system at " + hostAddress + ':' + port); + + Timeout timeout = new Timeout(AkkaUtils.getClientTimeout(config)); + return new AkkaRpcService(system, timeout); + } + + /** + * Attaches to a running Job using the JobID, and wait for its job result + * + * @param jobID id of job + * @param jobMasterGateway gateway to the JobMaster + * @param rpcService + * @param leaderRetrievalService leader retriever service of jobMaster + * @param sysoutLogUpdates whether status messages shall be printed to sysout + * @param timeout register timeout + * @param configuration the flink configuration + * @return + * @throws JobExecutionException + */ + public static JobExecutionResult retrieveRunningJobResult( + JobID jobID, + JobMasterGateway jobMasterGateway, + RpcService rpcService, + LeaderRetrievalService leaderRetrievalService, + boolean sysoutLogUpdates, + FiniteDuration timeout, + Configuration configuration) throws JobExecutionException + { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobMasterGateway, "The jobMasterGateway must not be null."); + checkNotNull(rpcService, "The rpcService must not be null."); + checkNotNull(leaderRetrievalService, "The leaderRetrievalService must not be null."); + checkNotNull(timeout, "The timeout must not be null"); + checkNotNull(configuration, "The configuration must not be null"); + + JobInfoTracker jobInfoTracker = null; + try { + jobInfoTracker = new JobInfoTracker(rpcService, leaderRetrievalService, jobID, sysoutLogUpdates); + jobInfoTracker.start(); + registerClientAtJobMaster(jobID, jobInfoTracker.getAddress(), jobMasterGateway, timeout); + ClassLoader classLoader = retrieveClassLoader(jobID, jobMasterGateway, configuration); + return awaitJobResult(jobInfoTracker, classLoader); + } finally { + if (jobInfoTracker != null) { + jobInfoTracker.shutDown(); + } + } + } + + /** + * Awaits the result of the job execution which jobInfoTracker listen for + * + * @param jobInfoTracker job info tracker + * @param classLoader classloader to parse the job result + * @return + * @throws JobExecutionException + */ + public static JobExecutionResult awaitJobResult(JobInfoTracker jobInfoTracker, + ClassLoader classLoader) throws JobExecutionException + { + try { + while (true) { + Future<JobExecutionResult> jobExecutionResultFuture = jobInfoTracker.getJobExecutionResult(classLoader); + try { + JobExecutionResult jobExecutionResult = Await.result(jobExecutionResultFuture, new Timeout(AkkaUtils.INF_TIMEOUT()).duration()); + return jobExecutionResult; + } catch (TimeoutException e) { + // ignore timeout exception, retry --- End diff -- I'm not sure if that will cause an infinite loop in case of reoccurring connection timeouts.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---