[
https://issues.apache.org/jira/browse/FLINK-2793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943629#comment-14943629
]
ASF GitHub Bot commented on FLINK-2793:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1202#discussion_r41167164
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
---
@@ -70,28 +104,68 @@ public ActorGateway getArchiveGateway() {
return archiveGateway;
}
+ /**
+ * Returns the current redirect address or <code>null</code> if the job
manager associated with
+ * this web monitor is leading. In that case, work with the gateway
directly.
+ */
+ public String getRedirectAddress() {
+ return redirectWebMonitorAddress;
+ }
@Override
public void notifyLeaderAddress(String leaderAddress, UUID
leaderSessionID) {
if (leaderAddress != null && !leaderAddress.equals("")) {
try {
- ActorRef jobManager = AkkaUtils.getActorRef(
- leaderAddress,
- actorSystem,
+ ActorRef jobManager =
AkkaUtils.getActorRef(leaderAddress, actorSystem,
lookupTimeout);
+
jobManagerGateway = new
AkkaActorGateway(jobManager, leaderSessionID);
Future<Object> archiveFuture =
jobManagerGateway.ask(
-
JobManagerMessages.getRequestArchive(),
- timeout);
+
JobManagerMessages.getRequestArchive(), timeout);
ActorRef archive =
((JobManagerMessages.ResponseArchive) Await.result(
- archiveFuture,
- timeout)
- ).actor();
-
+ archiveFuture,
timeout)).actor();
archiveGateway = new AkkaActorGateway(archive,
leaderSessionID);
- } catch (Exception e) {
+
+ if (jobManagerAkkaUrl == null) {
+ throw new
IllegalStateException("Unspecified Akka URL for the job manager " +
+ "associated with this
web monitor.");
+ }
+
+ boolean isLeader =
jobManagerAkkaUrl.equals(leaderAddress);
+
+ if (isLeader) {
+ // Our JobManager is leader and our
work is done :)
+ redirectWebMonitorAddress = null;
+ }
+ else {
+ // We need to redirect to the leader -.-
+ //
+ // This is necessary currently, because
many execution graph structures are not
+ // serializable. The proper solution
here is to have these serializable and
+ // transparently work with the leading
job manager instead of redirecting.
+ Future<Object> portFuture =
jobManagerGateway
+
.ask(JobManagerMessages.getRequestWebMonitorPort(), timeout);
+
+
JobManagerMessages.ResponseWebMonitorPort portResponse =
+
(JobManagerMessages.ResponseWebMonitorPort) Await.result(portFuture, timeout);
--- End diff --
Again a blocking call which is bad. Better to use futures to circumvent
this problem.
> Redirect to leading JobManager web fronted in non-standalone mode
> -----------------------------------------------------------------
>
> Key: FLINK-2793
> URL: https://issues.apache.org/jira/browse/FLINK-2793
> Project: Flink
> Issue Type: Improvement
> Components: JobManager
> Affects Versions: master
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
>
> In case of a non-standalone recovery mode, the job manager frontend of
> non-leading job managers prints the job manager information of its associated
> job manager. Because the job manager is not leading, nothing shows up.
> The web frontend cannot directly communicate with the leading job manager,
> because many job manager structures like the execution graph are not
> serializable.
> A work around is to redirect to the web frontend of the leading job manager.
> This makes sure that all interesting information is presented.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)