[
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427967#comment-15427967
]
ASF GitHub Bot commented on FLINK-4273:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2313#discussion_r75459375
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
ActorRef jobClientActor =
actorSystem.actorOf(jobClientActorProps);
-
+
+ Future<Object> submissionFuture = Patterns.ask(
+ jobClientActor,
+ new
JobClientMessages.SubmitJobAndWait(jobGraph),
+ new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+ return new JobListeningContext(
+ jobGraph.getJobID(),
+ submissionFuture,
+ jobClientActor,
+ classLoader);
+ }
+
+
+ /**
+ * Attaches to a running Job using the JobID.
+ * Reconstructs the user class loader by downloading the jars from the
JobManager.
+ * @throws JobRetrievalException if anything goes wrong while
retrieving the job
+ */
+ public static JobListeningContext attachToRunningJob(
+ JobID jobID,
+ ActorGateway jobManagerGateWay,
+ Configuration configuration,
+ ActorSystem actorSystem,
+ LeaderRetrievalService leaderRetrievalService,
+ FiniteDuration timeout,
+ boolean sysoutLogUpdates) throws JobRetrievalException {
+
+ checkNotNull(jobID, "The jobID must not be null.");
+ checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not
be null.");
+ checkNotNull(configuration, "The configuration must not be
null.");
+ checkNotNull(actorSystem, "The actorSystem must not be null.");
+ checkNotNull(leaderRetrievalService, "The jobManagerGateway
must not be null.");
+ checkNotNull(timeout, "The timeout must not be null.");
+
+ // retrieve classloader first before doing anything
+ ClassLoader classloader;
+ try {
+ classloader = retrieveClassLoader(jobID,
jobManagerGateWay, configuration, timeout);
--- End diff --
True, this code assumes that the JobManager doesn't change between
retrieving the leading jobmanager and retrieving the class loader. There is
always some possible gap where the jobmanager could change. We could mitigate
this by retrying in case is has changed.
> Refactor JobClientActor to watch already submitted jobs
> --------------------------------------------------------
>
> Key: FLINK-4273
> URL: https://issues.apache.org/jira/browse/FLINK-4273
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Minor
> Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for
> the result. This process should be broken up into a submission process and a
> waiting process which can both be entered independently. This leads to two
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)