Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4492#discussion_r131675749
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
---
@@ -101,127 +98,112 @@ public void update() {
private void fetchMetrics() {
try {
- Option<scala.Tuple2<ActorGateway, Integer>>
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
- if (jobManagerGatewayAndWebPort.isDefined()) {
- ActorGateway jobManager =
jobManagerGatewayAndWebPort.get()._1();
+ Optional<JobManagerGateway> optJobManagerGateway =
retriever.getJobManagerGatewayNow();
+ if (optJobManagerGateway.isPresent()) {
+ final JobManagerGateway jobManagerGateway =
optJobManagerGateway.get();
/**
* Remove all metrics that belong to a job that
is not running and no longer archived.
*/
- Future<Object> jobDetailsFuture =
jobManager.ask(new RequestJobDetails(true, true), timeout);
- jobDetailsFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object
result) throws Throwable {
- MultipleJobsDetails
details = (MultipleJobsDetails) result;
+ CompletableFuture<MultipleJobsDetails>
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+ jobDetailsFuture.whenCompleteAsync(
+ (MultipleJobsDetails jobDetails,
Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching of
JobDetails failed.", throwable);
+ } else {
ArrayList<String>
toRetain = new ArrayList<>();
- for (JobDetails job :
details.getRunningJobs()) {
+ for (JobDetails job :
jobDetails.getRunningJobs()) {
toRetain.add(job.getJobId().toString());
}
- for (JobDetails job :
details.getFinishedJobs()) {
+ for (JobDetails job :
jobDetails.getFinishedJobs()) {
toRetain.add(job.getJobId().toString());
}
synchronized (metrics) {
metrics.jobs.keySet().retainAll(toRetain);
}
}
- }, ctx);
- logErrorOnFailure(jobDetailsFuture, "Fetching
of JobDetails failed.");
+ },
+ executor);
- String jobManagerPath = jobManager.path();
- String queryServicePath =
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
- ActorRef jobManagerQueryService =
actorSystem.actorFor(queryServicePath);
+ String jobManagerPath =
jobManagerGateway.getAddress();
+ String jmQueryServicePath =
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
- queryMetrics(jobManagerQueryService);
+ retrieveAndQueryMetrics(jmQueryServicePath);
/**
* We first request the list of all registered
task managers from the job manager, and then
* request the respective metric dump from each
task manager.
*
* <p>All stored metrics that do not belong to
a registered task manager will be removed.
*/
- Future<Object> registeredTaskManagersFuture =
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
- registeredTaskManagersFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object
result) throws Throwable {
- Iterable<Instance>
taskManagers = ((JobManagerMessages.RegisteredTaskManagers)
result).asJavaIterable();
- List<String>
activeTaskManagers = new ArrayList<>();
- for (Instance
taskManager : taskManagers) {
-
activeTaskManagers.add(taskManager.getId().toString());
-
- String
taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
- String
queryServicePath = taskManagerPath.substring(0,
taskManagerPath.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" +
taskManager.getTaskManagerID().getResourceIdString();
- ActorRef
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
-
-
queryMetrics(taskManagerQueryService);
- }
- synchronized (metrics)
{ // remove all metrics belonging to unregistered task managers
+ CompletableFuture<Collection<Instance>>
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ taskManagersFuture.whenCompleteAsync(
+ (Collection<Instance> taskManagers,
Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.debug("Fetching
list of registered TaskManagers failed.", throwable);
+ } else {
+ List<String>
activeTaskManagers = taskManagers.stream().map(
+
taskManagerInstance -> {
+ final
String taskManagerAddress =
taskManagerInstance.getTaskManagerGateway().getAddress();
+ final
String tmQueryServicePath = taskManagerAddress.substring(0,
taskManagerAddress.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" +
taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+
retrieveAndQueryMetrics(tmQueryServicePath);
+
+ return
taskManagerInstance.getId().toString();
+
}).collect(Collectors.toList());
+
+ synchronized (metrics) {
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
}
}
- }, ctx);
- logErrorOnFailure(registeredTaskManagersFuture,
"Fetchin list of registered TaskManagers failed.");
+ },
+ executor);
}
} catch (Exception e) {
LOG.warn("Exception while fetching metrics.", e);
}
}
- private void logErrorOnFailure(Future<Object> future, final String
message) {
- future.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) throws
Throwable {
- LOG.debug(message, failure);
- }
- }, ctx);
- }
-
/**
- * Requests a metric dump from the given actor.
+ * Retrieves and queries the specified QueryServiceGateway.
*
- * @param actor ActorRef to request the dump from
- */
- private void queryMetrics(ActorRef actor) {
- Future<Object> metricQueryFuture = new
BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
- metricQueryFuture
- .onSuccess(new OnSuccess<Object>() {
- @Override
- public void onSuccess(Object result) throws
Throwable {
- addMetrics(result);
+ * @param queryServicePath specifying the QueryServiceGateway
+ */
+ private void retrieveAndQueryMetrics(String queryServicePath) {
+ final CompletableFuture<MetricQueryServiceGateway>
jmQueryServiceGatewayFuture =
queryServiceRetriever.retrieveService(queryServicePath);
--- End diff --
variable name is misleading, as we could also be retrieving a taskmanager
query service.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---