[
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428048#comment-15428048
]
ASF GitHub Bot commented on FLINK-4273:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2313#discussion_r75468671
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
ActorRef jobClientActor =
actorSystem.actorOf(jobClientActorProps);
-
+
+ Future<Object> submissionFuture = Patterns.ask(
+ jobClientActor,
+ new
JobClientMessages.SubmitJobAndWait(jobGraph),
+ new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+ return new JobListeningContext(
+ jobGraph.getJobID(),
+ submissionFuture,
+ jobClientActor,
+ classLoader);
+ }
+
+
+ /**
+ * Attaches to a running Job using the JobID.
+ * Reconstructs the user class loader by downloading the jars from the
JobManager.
+ * @throws JobRetrievalException if anything goes wrong while
retrieving the job
+ */
+ public static JobListeningContext attachToRunningJob(
+ JobID jobID,
+ ActorGateway jobManagerGateWay,
+ Configuration configuration,
+ ActorSystem actorSystem,
+ LeaderRetrievalService leaderRetrievalService,
+ FiniteDuration timeout,
+ boolean sysoutLogUpdates) throws JobRetrievalException {
+
+ checkNotNull(jobID, "The jobID must not be null.");
+ checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not
be null.");
+ checkNotNull(configuration, "The configuration must not be
null.");
+ checkNotNull(actorSystem, "The actorSystem must not be null.");
+ checkNotNull(leaderRetrievalService, "The jobManagerGateway
must not be null.");
+ checkNotNull(timeout, "The timeout must not be null.");
+
+ // retrieve classloader first before doing anything
+ ClassLoader classloader;
+ try {
+ classloader = retrieveClassLoader(jobID,
jobManagerGateWay, configuration, timeout);
+ LOG.info("Reconstructed class loader for Job {}" ,
jobID);
+ } catch (Exception e) {
+ LOG.warn("Couldn't retrieve classloader for {}. Using
system class loader", jobID, e);
+ classloader = JobClient.class.getClassLoader();
+ }
+
+ // we create a proxy JobClientActor that deals with all
communication with
+ // the JobManager. It forwards the job submission, checks the
success/failure responses, logs
+ // update messages, watches for disconnect between client and
JobManager, ...
+ Props jobClientActorProps =
JobClientActor.createJobClientActorProps(
+ leaderRetrievalService,
+ timeout,
+ sysoutLogUpdates);
+
+ ActorRef jobClientActor =
actorSystem.actorOf(jobClientActorProps);
+
+ Future<Object> attachmentFuture = Patterns.ask(
+ jobClientActor,
+ new JobClientMessages.AttachToJobAndWait(jobID),
+ new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+ return new JobListeningContext(
+ jobID,
+ attachmentFuture,
+ jobClientActor,
+ classloader);
+ }
+
+ /**
+ * Reconstructs the class loader by first requesting information about
it at the JobManager
+ * and then downloading missing jar files.
+ * @param jobID id of job
+ * @param jobManager gateway to the JobManager
+ * @param config the flink configuration
+ * @param timeout timeout for querying the jobmanager
+ * @return A classloader that should behave like the original
classloader
+ * @throws JobRetrievalException if anything goes wrong
+ */
+ public static ClassLoader retrieveClassLoader(
+ JobID jobID,
+ ActorGateway jobManager,
+ Configuration config,
+ FiniteDuration timeout)
+ throws JobRetrievalException {
+
+ final Object jmAnswer;
+ try {
+ jmAnswer = Await.result(
+ jobManager.ask(
+ new
JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
+ } catch (Exception e) {
+ throw new JobRetrievalException(jobID, "JobManager
didn't respond", e);
+ }
+
+ if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
+ JobManagerMessages.ClassloadingProps props =
((JobManagerMessages.ClassloadingProps) jmAnswer);
+
+ Option<String> jmHost =
jobManager.actor().path().address().host();
+ String jmHostname = jmHost.isDefined() ? jmHost.get() :
"localhost";
+ InetSocketAddress serverAddress = new
InetSocketAddress(jmHostname, props.blobManagerPort());
+ final BlobCache blobClient = new
BlobCache(serverAddress, config);
+
+ final List<BlobKey> requiredJarFiles =
props.requiredJarFiles();
+ final List<URL> requiredClasspaths =
props.requiredClasspaths();
+
+ final URL[] allURLs = new URL[requiredJarFiles.size() +
requiredClasspaths.size()];
+
+ int pos = 0;
+ for (BlobKey blobKey : props.requiredJarFiles()) {
+ try {
+ allURLs[pos++] =
blobClient.getURL(blobKey);
+ } catch (Exception e) {
+ blobClient.shutdown();
+ throw new JobRetrievalException(jobID,
"Failed to download BlobKey " + blobKey);
+ }
+ }
+
+ for (URL url : requiredClasspaths) {
+ allURLs[pos++] = url;
+ }
+
+ return new URLClassLoader(allURLs,
JobClient.class.getClassLoader());
+ } else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
+ throw new JobRetrievalException(jobID, "Couldn't
retrieve class loader. Job " + jobID + " not found");
+ } else {
+ throw new JobRetrievalException(jobID, "Unknown
response from JobManager: " + jmAnswer);
+ }
+ }
+
+ /**
+ * Given a JobListeningContext, awaits the result of the job execution
that this context is bound to
+ * @param listeningContext The listening context of the job execution
+ * @return The result of the execution
+ * @throws JobExecutionException if anything goes wrong while
monitoring the job
+ */
+ public static JobExecutionResult awaitJobResult(JobListeningContext
listeningContext) throws JobExecutionException {
+
+ final JobID jobID = listeningContext.jobID;
+ final Future<Object> jobSubmissionFuture =
listeningContext.jobResultFuture;
+ final ClassLoader classLoader = listeningContext.classLoader;
+
// first block handles errors while waiting for the result
- Object answer;
+ final Object answer;
try {
- Future<Object> future = Patterns.ask(jobClientActor,
- new
JobClientMessages.SubmitJobAndWait(jobGraph),
- new Timeout(AkkaUtils.INF_TIMEOUT()));
-
- answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
+ answer = Await.result(jobSubmissionFuture,
AkkaUtils.INF_TIMEOUT());
--- End diff --
Okay, will ping the actor periodically to check if it is alive.
> Refactor JobClientActor to watch already submitted jobs
> --------------------------------------------------------
>
> Key: FLINK-4273
> URL: https://issues.apache.org/jira/browse/FLINK-4273
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Minor
> Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for
> the result. This process should be broken up into a submission process and a
> waiting process which can both be entered independently. This leads to two
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)