[ https://issues.apache.org/jira/browse/FLINK-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15522610#comment-15522610 ]
ASF GitHub Bot commented on FLINK-4653: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2524#discussion_r80441549 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobClient/JobClientUtils.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobClient; + +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.util.Timeout; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * JobClientUtils is a utility for client. + * It offers the following methods: + * <ul> + * <li>{@link #startJobClientRpcService(Configuration)} Starts a rpc service for client</li> + * <li>{@link #retrieveRunningJobResult(JobID, JobMasterGateway, RpcService, LeaderRetrievalService, boolean, FiniteDuration, Configuration)} + * Attaches to a running Job using the JobID, and wait for its job result</li> + * <li>{@link #awaitJobResult(JobInfoTracker, ClassLoader)} Awaits the result of the job execution which jobInfoTracker listen for</li> + * <li>{@link #retrieveClassLoader(JobID, JobMasterGateway, Configuration)} Reconstructs the class loader by first requesting information about it at the JobMaster + * and then downloading missing jar files</li> + * </ul> + */ +public class JobClientUtils { + + private static final Logger LOG = LoggerFactory.getLogger(JobClientUtils.class); + + + /** + * Starts a rpc service for client + * + * @param config the flink configuration + * @return + * @throws IOException + */ + public static RpcService startJobClientRpcService(Configuration config) + throws IOException + { + LOG.info("Starting JobClientUtils rpc service"); + Option<Tuple2<String, Object>> remoting = new Some<>(new Tuple2<String, Object>("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem system = AkkaUtils.createActorSystem(config, remoting); + Address address = system.provider().getDefaultAddress(); + + String hostAddress = address.host().isDefined() ? + NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) : + "(unknown)"; + int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1; + LOG.info("Started JobClientUtils actor system at " + hostAddress + ':' + port); + + Timeout timeout = new Timeout(AkkaUtils.getClientTimeout(config)); + return new AkkaRpcService(system, timeout); + } + + /** + * Attaches to a running Job using the JobID, and wait for its job result + * + * @param jobID id of job + * @param jobMasterGateway gateway to the JobMaster + * @param rpcService + * @param leaderRetrievalService leader retriever service of jobMaster + * @param sysoutLogUpdates whether status messages shall be printed to sysout + * @param timeout register timeout + * @param configuration the flink configuration + * @return + * @throws JobExecutionException + */ + public static JobExecutionResult retrieveRunningJobResult( + JobID jobID, + JobMasterGateway jobMasterGateway, + RpcService rpcService, + LeaderRetrievalService leaderRetrievalService, + boolean sysoutLogUpdates, + FiniteDuration timeout, + Configuration configuration) throws JobExecutionException + { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobMasterGateway, "The jobMasterGateway must not be null."); + checkNotNull(rpcService, "The rpcService must not be null."); + checkNotNull(leaderRetrievalService, "The leaderRetrievalService must not be null."); + checkNotNull(timeout, "The timeout must not be null"); + checkNotNull(configuration, "The configuration must not be null"); + + JobInfoTracker jobInfoTracker = null; + try { + jobInfoTracker = new JobInfoTracker(rpcService, leaderRetrievalService, jobID, sysoutLogUpdates); + jobInfoTracker.start(); + registerClientAtJobMaster(jobID, jobInfoTracker.getAddress(), jobMasterGateway, timeout); + ClassLoader classLoader = retrieveClassLoader(jobID, jobMasterGateway, configuration); + return awaitJobResult(jobInfoTracker, classLoader); + } finally { + if (jobInfoTracker != null) { + jobInfoTracker.shutDown(); + } + } + } + + /** + * Awaits the result of the job execution which jobInfoTracker listen for + * + * @param jobInfoTracker job info tracker + * @param classLoader classloader to parse the job result + * @return + * @throws JobExecutionException + */ + public static JobExecutionResult awaitJobResult(JobInfoTracker jobInfoTracker, + ClassLoader classLoader) throws JobExecutionException + { + try { + while (true) { + Future<JobExecutionResult> jobExecutionResultFuture = jobInfoTracker.getJobExecutionResult(classLoader); + try { + JobExecutionResult jobExecutionResult = Await.result(jobExecutionResultFuture, new Timeout(AkkaUtils.INF_TIMEOUT()).duration()); + return jobExecutionResult; + } catch (TimeoutException e) { + // ignore timeout exception, retry --- End diff -- I'm not sure if that will cause an infinite loop in case of reoccurring connection timeouts. > Refactor JobClientActor to adapt to the new Rpc framework and new cluster > managerment > ------------------------------------------------------------------------------------- > > Key: FLINK-4653 > URL: https://issues.apache.org/jira/browse/FLINK-4653 > Project: Flink > Issue Type: Sub-task > Components: Client > Reporter: zhangjing > Assignee: zhangjing > Fix For: 1.2.0 > > > 1. Create a RpcEndpoint(temporary named JobInfoTracker) and > RpcGateway(temporary named JobInfoTrackerGateway) to replace the old > JobClientActor. > 2. Change rpc message communication in JobClientActor to rpc method call to > apply to the new rpc framework. > 3. JobInfoTracker is responsible for waiting for the jobStateChange and > jobResult util job complete. But it is not responsible for submitting new job > because jobSubmission behavior is different in different cluster -- This message was sent by Atlassian JIRA (v6.3.4#6332)