[
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428137#comment-15428137
]
ASF GitHub Bot commented on FLINK-4273:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2313#discussion_r75474811
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
ActorRef jobClientActor =
actorSystem.actorOf(jobClientActorProps);
-
+
+ Future<Object> submissionFuture = Patterns.ask(
+ jobClientActor,
+ new
JobClientMessages.SubmitJobAndWait(jobGraph),
+ new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+ return new JobListeningContext(
+ jobGraph.getJobID(),
+ submissionFuture,
+ jobClientActor,
+ classLoader);
+ }
+
+
+ /**
+ * Attaches to a running Job using the JobID.
+ * Reconstructs the user class loader by downloading the jars from the
JobManager.
+ * @throws JobRetrievalException if anything goes wrong while
retrieving the job
+ */
+ public static JobListeningContext attachToRunningJob(
+ JobID jobID,
+ ActorGateway jobManagerGateWay,
+ Configuration configuration,
+ ActorSystem actorSystem,
+ LeaderRetrievalService leaderRetrievalService,
+ FiniteDuration timeout,
+ boolean sysoutLogUpdates) throws JobRetrievalException {
+
+ checkNotNull(jobID, "The jobID must not be null.");
+ checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not
be null.");
+ checkNotNull(configuration, "The configuration must not be
null.");
+ checkNotNull(actorSystem, "The actorSystem must not be null.");
+ checkNotNull(leaderRetrievalService, "The jobManagerGateway
must not be null.");
+ checkNotNull(timeout, "The timeout must not be null.");
+
+ // retrieve classloader first before doing anything
+ ClassLoader classloader;
+ try {
+ classloader = retrieveClassLoader(jobID,
jobManagerGateWay, configuration, timeout);
--- End diff --
Actually, I'm not really sure about this corner case. We don't typically
retry client side operations in case the leader has changed after retrieving
it. Instead, we just throw an error (see all the methods in `ClusterClient`).
The `JobClientActor` is exceptional in this regard and it has to be because it
operates independently of the user function.
So we could fail if we can't reconstruct the class loader. That of course
has the caveat that even if the user doesn't use custom classes for the
JobExecutionResult or Exceptions, the job retrieval may fail (e.g. firewall
blocking the blobManager port). That's why I didn't want to enforce this step
but we could enforce it and fix eventual problems with the BlobManager
communication if there are any.
> Refactor JobClientActor to watch already submitted jobs
> --------------------------------------------------------
>
> Key: FLINK-4273
> URL: https://issues.apache.org/jira/browse/FLINK-4273
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Minor
> Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for
> the result. This process should be broken up into a submission process and a
> waiting process which can both be entered independently. This leads to two
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)