[
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650987#comment-15650987
]
ASF GitHub Bot commented on FLINK-4272:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2732#discussion_r87102297
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
---
@@ -142,4 +209,66 @@ private ActorGateway getJobManager() throws
JobRetrievalException {
throw new JobRetrievalException(jobID, "Couldn't
retrieve leading JobManager.", e);
}
}
+
+ /**
+ * Reconstructs the class loader by first requesting information about
it at the JobManager
+ * and then downloading missing jar files.
+ * @param jobID id of job
+ * @param jobManager gateway to the JobManager
+ * @param config the flink configuration
+ * @return A classloader that should behave like the original
classloader
+ * @throws JobRetrievalException if anything goes wrong
+ */
+ private static ClassLoader retrieveClassLoader(
+ JobID jobID,
+ ActorGateway jobManager,
+ Configuration config)
+ throws JobRetrievalException {
+
+ final Object jmAnswer;
+ try {
+ jmAnswer = Await.result(
+ jobManager.ask(
+ new
JobManagerMessages.RequestClassloadingProps(jobID),
+
AkkaUtils.getDefaultTimeoutAsFiniteDuration()),
+ AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+ } catch (Exception e) {
+ throw new JobRetrievalException(jobID, "Couldn't
retrieve class loading properties from JobManager.", e);
+ }
+
+ if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
+ JobManagerMessages.ClassloadingProps props =
((JobManagerMessages.ClassloadingProps) jmAnswer);
+
+ Option<String> jmHost =
jobManager.actor().path().address().host();
+ String jmHostname = jmHost.isDefined() ? jmHost.get() :
"localhost";
+ InetSocketAddress serverAddress = new
InetSocketAddress(jmHostname, props.blobManagerPort());
+ final BlobCache blobClient = new
BlobCache(serverAddress, config);
+
+ final List<BlobKey> requiredJarFiles =
props.requiredJarFiles();
+ final List<URL> requiredClasspaths =
props.requiredClasspaths();
+
+ final URL[] allURLs = new URL[requiredJarFiles.size() +
requiredClasspaths.size()];
+
+ int pos = 0;
+ for (BlobKey blobKey : props.requiredJarFiles()) {
+ try {
+ allURLs[pos++] =
blobClient.getURL(blobKey);
+ } catch (Exception e) {
+ blobClient.shutdown();
+ throw new JobRetrievalException(jobID,
"Failed to download BlobKey " + blobKey);
--- End diff --
Exception `e` is swallowed.
> Create a JobClient for job control and monitoring
> --------------------------------------------------
>
> Key: FLINK-4272
> URL: https://issues.apache.org/jira/browse/FLINK-4272
> Project: Flink
> Issue Type: New Feature
> Components: Client
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Minor
> Fix For: 1.2.0
>
>
> The aim of this new features is to expose a client to the user which allows
> to cancel a running job, retrieve accumulators for a running job, or perform
> other actions in the future. Let's call it {{JobClient}} for now (although
> this clashes with the existing JobClient class which could be renamed to
> JobClientActorUtils instead).
> The new client should be returned from the {{ClusterClient}} class upon job
> submission. The client should also be instantiatable by the users to retrieve
> the JobClient with a JobID.
> We should expose the new JobClient to the Java and Scala APIs using a new
> method on the {{ExecutionEnvironment}} / {{StreamExecutionEnvironment}}
> called {{executeWithControl()}} (perhaps we can find a better name).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)