Repository: flink
Updated Branches:
  refs/heads/master 91d346e9e -> 31c0754ee


[FLINK-9016] [flip6] Properly unregister jobs from JobMetricGroup

This commit properly removes jobs from the JobMetricGroup once a job
has reached a terminal state.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4a1d09c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4a1d09c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4a1d09c

Branch: refs/heads/master
Commit: c4a1d09ccdbd19416e15534ceba45ead5d2d6ed2
Parents: 91d346e
Author: Till Rohrmann <[email protected]>
Authored: Fri Mar 16 19:16:49 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Mar 18 14:25:00 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 33 ++++++++++++++------
 .../runtime/dispatcher/MiniDispatcher.java      |  8 +++--
 .../dispatcher/StandaloneDispatcher.java        |  8 +++--
 .../runtime/entrypoint/ClusterEntrypoint.java   | 27 ++++++++++++++--
 .../entrypoint/JobClusterEntrypoint.java        |  8 +++--
 .../entrypoint/SessionClusterEntrypoint.java    | 26 ++++++++-------
 .../runtime/jobmaster/JobManagerRunner.java     | 31 +++---------------
 .../flink/runtime/jobmaster/JobMaster.java      | 15 +++------
 .../flink/runtime/minicluster/MiniCluster.java  | 14 ++++++++-
 .../runtime/dispatcher/DispatcherTest.java      | 20 +++++++-----
 .../runtime/dispatcher/MiniDispatcherTest.java  | 10 +++---
 .../runtime/jobmaster/JobManagerRunnerTest.java | 10 ++----
 .../flink/runtime/jobmaster/JobMasterTest.java  |  3 +-
 13 files changed, 122 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 9b2411c..91a4f73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -49,7 +49,8 @@ import 
org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
@@ -97,7 +98,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        private final JobManagerSharedServices jobManagerSharedServices;
        private final HeartbeatServices heartbeatServices;
        private final BlobServer blobServer;
-       private final MetricRegistry metricRegistry;
 
        private final FatalErrorHandler fatalErrorHandler;
 
@@ -109,6 +109,11 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        private final JobManagerRunnerFactory jobManagerRunnerFactory;
 
+       private final JobManagerMetricGroup jobManagerMetricGroup;
+
+       @Nullable
+       private final String metricQueryServicePath;
+
        @Nullable
        protected final String restAddress;
 
@@ -123,7 +128,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       JobManagerMetricGroup jobManagerMetricGroup,
+                       @Nullable String metricServiceQueryPath,
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
@@ -135,9 +141,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                this.resourceManagerGateway = 
Preconditions.checkNotNull(resourceManagerGateway);
                this.heartbeatServices = 
Preconditions.checkNotNull(heartbeatServices);
                this.blobServer = Preconditions.checkNotNull(blobServer);
-               this.metricRegistry = 
Preconditions.checkNotNull(metricRegistry);
                this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
                this.submittedJobGraphStore = 
Preconditions.checkNotNull(submittedJobGraphStore);
+               this.jobManagerMetricGroup = 
Preconditions.checkNotNull(jobManagerMetricGroup);
+               this.metricQueryServicePath = metricServiceQueryPath;
 
                this.jobManagerSharedServices = 
JobManagerSharedServices.fromConfiguration(
                        configuration,
@@ -192,6 +199,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                        exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
                                }
 
+                               jobManagerMetricGroup.close();
+
                                if (exception != null) {
                                        throw exception;
                                } else {
@@ -251,7 +260,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                        heartbeatServices,
                                        blobServer,
                                        jobManagerSharedServices,
-                                       metricRegistry,
+                                       jobManagerMetricGroup.addJob(jobGraph),
+                                       metricQueryServicePath,
                                        restAddress);
 
                                
jobManagerRunner.getResultFuture().whenCompleteAsync(
@@ -464,8 +474,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        @Override
        public CompletableFuture<Collection<String>> 
requestMetricQueryServicePaths(Time timeout) {
-               final String metricQueryServicePath = 
metricRegistry.getMetricQueryServicePath();
-
                if (metricQueryServicePath != null) {
                        return 
CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
                } else {
@@ -513,6 +521,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        
registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture);
                }
 
+               jobManagerMetricGroup.removeJob(jobId);
+
                if (cleanupHA) {
                        submittedJobGraphStore.removeJobGraph(jobId);
                }
@@ -725,7 +735,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        HeartbeatServices heartbeatServices,
                        BlobServer blobServer,
                        JobManagerSharedServices jobManagerServices,
-                       MetricRegistry metricRegistry,
+                       JobManagerJobMetricGroup jobManagerJobMetricGroup,
+                       @Nullable String metricQueryServicePath,
                        @Nullable String restAddress) throws Exception;
        }
 
@@ -745,7 +756,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                HeartbeatServices heartbeatServices,
                                BlobServer blobServer,
                                JobManagerSharedServices jobManagerServices,
-                               MetricRegistry metricRegistry,
+                               JobManagerJobMetricGroup 
jobManagerJobMetricGroup,
+                               @Nullable String metricQueryServicePath,
                                @Nullable String restAddress) throws Exception {
                        return new JobManagerRunner(
                                resourceId,
@@ -756,7 +768,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                heartbeatServices,
                                blobServer,
                                jobManagerServices,
-                               metricRegistry,
+                               jobManagerJobMetricGroup,
+                               metricQueryServicePath,
                                restAddress);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index c648131..3f45824 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -60,7 +60,8 @@ public class MiniDispatcher extends Dispatcher {
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       JobManagerMetricGroup jobManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
@@ -76,7 +77,8 @@ public class MiniDispatcher extends Dispatcher {
                        resourceManagerGateway,
                        blobServer,
                        heartbeatServices,
-                       metricRegistry,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
                        archivedExecutionGraphStore,
                        jobManagerRunnerFactory,
                        fatalErrorHandler,

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index a7d21f3..52ac7a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -45,7 +45,8 @@ public class StandaloneDispatcher extends Dispatcher {
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       JobManagerMetricGroup jobManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        JobManagerRunnerFactory jobManagerRunnerFactory,
                        FatalErrorHandler fatalErrorHandler,
@@ -59,7 +60,8 @@ public class StandaloneDispatcher extends Dispatcher {
                        resourceManagerGateway,
                        blobServer,
                        heartbeatServices,
-                       metricRegistry,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
                        archivedExecutionGraphStore,
                        jobManagerRunnerFactory,
                        fatalErrorHandler,

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 07b3b68..676415b 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -51,6 +51,8 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -154,6 +156,9 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        @GuardedBy("lock")
        private ClusterInformation clusterInformation;
 
+       @GuardedBy("lock")
+       private JobManagerMetricGroup jobManagerMetricGroup;
+
        protected ClusterEntrypoint(Configuration configuration) {
                this.configuration = Preconditions.checkNotNull(configuration);
                this.terminationFuture = new CompletableFuture<>();
@@ -327,6 +332,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                clusterInformation,
                                webMonitorEndpoint.getRestAddress());
 
+                       jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, 
rpcService.getAddress());
+
                        dispatcher = createDispatcher(
                                configuration,
                                rpcService,
@@ -334,7 +341,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                
resourceManager.getSelfGateway(ResourceManagerGateway.class),
                                blobServer,
                                heartbeatServices,
-                               metricRegistry,
+                               jobManagerMetricGroup,
+                               metricRegistry.getMetricQueryServicePath(),
                                archivedExecutionGraphStore,
                                this,
                                webMonitorEndpoint.getRestAddress());
@@ -488,7 +496,19 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                
terminationFutures.add(FutureUtils.completedExceptionally(exception));
                        }
 
-                       return FutureUtils.completeAll(terminationFutures);
+                       final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+                       if (jobManagerMetricGroup != null) {
+                               return FutureUtils.runAfterwards(
+                                       componentTerminationFuture,
+                                       () -> {
+                                               synchronized (lock) {
+                                                       
jobManagerMetricGroup.close();
+                                               }
+                                       });
+                       } else {
+                               return componentTerminationFuture;
+                       }
                }
        }
 
@@ -567,7 +587,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                ResourceManagerGateway resourceManagerGateway,
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
+               JobManagerMetricGroup jobManagerMetricGroup,
+               @Nullable String metricQueryServicePath,
                ArchivedExecutionGraphStore archivedExecutionGraphStore,
                FatalErrorHandler fatalErrorHandler,
                @Nullable String restAddress) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index dc211d8..df950a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
@@ -95,7 +95,8 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       JobManagerMetricGroup jobManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
                        ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress) throws Exception {
@@ -114,7 +115,8 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        resourceManagerGateway,
                        blobServer,
                        heartbeatServices,
-                       metricRegistry,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
                        archivedExecutionGraphStore,
                        Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                        fatalErrorHandler,

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 764356d..fcab796 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
@@ -104,16 +104,17 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
 
        @Override
        protected Dispatcher createDispatcher(
-               Configuration configuration,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices,
-               ResourceManagerGateway resourceManagerGateway,
-               BlobServer blobServer,
-               HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
-               ArchivedExecutionGraphStore archivedExecutionGraphStore,
-               FatalErrorHandler fatalErrorHandler,
-               @Nullable String restAddress) throws Exception {
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       ResourceManagerGateway resourceManagerGateway,
+                       BlobServer blobServer,
+                       HeartbeatServices heartbeatServices,
+                       JobManagerMetricGroup jobManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
+                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
+                       FatalErrorHandler fatalErrorHandler,
+                       @Nullable String restAddress) throws Exception {
 
                // create the default dispatcher
                return new StandaloneDispatcher(
@@ -124,7 +125,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        resourceManagerGateway,
                        blobServer,
                        heartbeatServices,
-                       metricRegistry,
+                       jobManagerMetricGroup,
+                       metricQueryServicePath,
                        archivedExecutionGraphStore,
                        Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                        fatalErrorHandler,

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 8b64f0d..cd2852b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -35,9 +35,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.AutoCloseableAsync;
@@ -82,8 +80,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
 
        private final JobMaster jobManager;
 
-       private final JobManagerMetricGroup jobManagerMetricGroup;
-
        private final Time rpcTimeout;
 
        private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -111,11 +107,10 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        final HeartbeatServices heartbeatServices,
                        final BlobServer blobServer,
                        final JobManagerSharedServices jobManagerSharedServices,
-                       final MetricRegistry metricRegistry,
+                       final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+                       @Nullable final String metricQueryServicePath,
                        @Nullable final String restAddress) throws Exception {
 
-               JobManagerMetricGroup jobManagerMetrics = null;
-
                this.resultFuture = new CompletableFuture<>();
                this.terminationFuture = new CompletableFuture<>();
 
@@ -126,10 +121,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
 
                        checkArgument(jobGraph.getNumberOfVertices() > 0, "The 
given job is empty");
 
-                       final String hostAddress = 
rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
-                       jobManagerMetrics = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostAddress);
-                       this.jobManagerMetricGroup = jobManagerMetrics;
-
                        // libraries and class loader first
                        final LibraryCacheManager libraryCacheManager = 
jobManagerSharedServices.getLibraryCacheManager();
                        try {
@@ -162,19 +153,14 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                jobManagerSharedServices,
                                heartbeatServices,
                                blobServer,
-                               jobManagerMetrics,
+                               jobManagerJobMetricGroup,
                                this,
                                this,
                                userCodeLoader,
                                restAddress,
-                               metricRegistry.getMetricQueryServicePath());
+                               metricQueryServicePath);
                }
                catch (Throwable t) {
-                       // clean up everything
-                       if (jobManagerMetrics != null) {
-                               jobManagerMetrics.close();
-                       }
-
                        terminationFuture.completeExceptionally(t);
                        resultFuture.completeExceptionally(t);
 
@@ -230,13 +216,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                                        throwable = 
ExceptionUtils.firstOrSuppressed(t, 
ExceptionUtils.stripCompletionException(throwable));
                                                }
 
-                                               // make all registered metrics 
go away
-                                               try {
-                                                       
jobManagerMetricGroup.close();
-                                               } catch (Throwable t) {
-                                                       throwable = 
ExceptionUtils.firstOrSuppressed(t, throwable);
-                                               }
-
                                                if (throwable != null) {
                                                        
terminationFuture.completeExceptionally(
                                                                new 
FlinkException("Could not properly shut down the JobManagerRunner", throwable));

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 74f9b65..ced8c7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
@@ -76,8 +75,7 @@ import 
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -165,7 +163,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        private final BlobServer blobServer;
 
        /** The metrics for the job. */
-       private final MetricGroup jobMetricGroup;
+       private final JobManagerJobMetricGroup jobMetricGroup;
 
        /** The heartbeat manager with task managers. */
        private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
@@ -225,7 +223,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        JobManagerSharedServices jobManagerSharedServices,
                        HeartbeatServices heartbeatServices,
                        BlobServer blobServer,
-                       @Nullable JobManagerMetricGroup jobManagerMetricGroup,
+                       JobManagerJobMetricGroup jobMetricGroup,
                        OnCompletionActions jobCompletionActions,
                        FatalErrorHandler errorHandler,
                        ClassLoader userCodeLoader,
@@ -246,6 +244,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                this.jobCompletionActions = checkNotNull(jobCompletionActions);
                this.errorHandler = checkNotNull(errorHandler);
                this.userCodeLoader = checkNotNull(userCodeLoader);
+               this.jobMetricGroup = checkNotNull(jobMetricGroup);
 
                this.taskManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
@@ -262,12 +261,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                final String jobName = jobGraph.getName();
                final JobID jid = jobGraph.getJobID();
 
-               if (jobManagerMetricGroup != null) {
-                       this.jobMetricGroup = 
jobManagerMetricGroup.addJob(jobGraph);
-               } else {
-                       this.jobMetricGroup = 
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
-               }
-
                log.info("Initializing job {} ({}).", jobName, jid);
 
                final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 98c8ca2..d660c67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -50,6 +50,8 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
@@ -154,6 +156,9 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        private StandaloneDispatcher dispatcher;
 
        @GuardedBy("lock")
+       private JobManagerMetricGroup jobManagerMetricGroup;
+
+       @GuardedBy("lock")
        private RpcGatewayRetriever<DispatcherId, DispatcherGateway> 
dispatcherGatewayRetriever;
 
        /** Flag marking the mini cluster as started/running. */
@@ -344,6 +349,8 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
                                LOG.info("Starting job dispatcher(s) for 
JobManger");
 
+                               this.jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
+
                                dispatcher = new StandaloneDispatcher(
                                        jobManagerRpcService,
                                        Dispatcher.DISPATCHER_NAME + 
UUID.randomUUID(),
@@ -352,7 +359,8 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        
resourceManagerRunner.getResourceManageGateway(),
                                        blobServer,
                                        heartbeatServices,
-                                       metricRegistry,
+                                       jobManagerMetricGroup,
+                                       
metricRegistry.getMetricQueryServicePath(),
                                        new MemoryArchivedExecutionGraphStore(),
                                        
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                                        new ShutDownFatalErrorHandler(),
@@ -424,6 +432,10 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                                componentsTerminationFuture,
                                                () -> {
                                                        synchronized (lock) {
+                                                               if 
(jobManagerMetricGroup != null) {
+                                                                       
jobManagerMetricGroup.close();
+                                                                       
jobManagerMetricGroup = null;
+                                                               }
                                                                // metrics 
shutdown
                                                                if 
(metricRegistry != null) {
                                                                        
metricRegistry.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 8267921..71c391f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -45,8 +45,9 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -177,7 +178,8 @@ public class DispatcherTest extends TestLogger {
                        mock(ResourceManagerGateway.class),
                        new BlobServer(blobServerConfig, new VoidBlobStore()),
                        heartbeatServices,
-                       NoOpMetricRegistry.INSTANCE,
+                       
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+                       null,
                        new MemoryArchivedExecutionGraphStore(),
                        fatalErrorHandler,
                        TEST_JOB_ID);
@@ -360,7 +362,8 @@ public class DispatcherTest extends TestLogger {
                                ResourceManagerGateway resourceManagerGateway,
                                BlobServer blobServer,
                                HeartbeatServices heartbeatServices,
-                               MetricRegistry metricRegistry,
+                               JobManagerMetricGroup jobManagerMetricGroup,
+                               @Nullable String metricQueryServicePath,
                                ArchivedExecutionGraphStore 
archivedExecutionGraphStore,
                                FatalErrorHandler fatalErrorHandler,
                                JobID expectedJobId) throws Exception {
@@ -373,7 +376,8 @@ public class DispatcherTest extends TestLogger {
                                resourceManagerGateway,
                                blobServer,
                                heartbeatServices,
-                               metricRegistry,
+                               jobManagerMetricGroup,
+                               metricQueryServicePath,
                                archivedExecutionGraphStore,
                                new 
ExpectedJobIdJobManagerRunnerFactory(expectedJobId),
                                fatalErrorHandler,
@@ -421,7 +425,8 @@ public class DispatcherTest extends TestLogger {
                                HeartbeatServices heartbeatServices,
                                BlobServer blobServer,
                                JobManagerSharedServices 
jobManagerSharedServices,
-                               MetricRegistry metricRegistry,
+                               JobManagerJobMetricGroup 
jobManagerJobMetricGroup,
+                               @Nullable String metricQueryServicePath,
                                @Nullable String restAddress) throws Exception {
                        assertEquals(expectedJobId, jobGraph.getJobID());
 
@@ -434,7 +439,8 @@ public class DispatcherTest extends TestLogger {
                                heartbeatServices,
                                blobServer,
                                jobManagerSharedServices,
-                               metricRegistry,
+                               jobManagerJobMetricGroup,
+                               metricQueryServicePath,
                                restAddress);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index c6eda2e..651200f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -254,7 +254,8 @@ public class MiniDispatcherTest extends TestLogger {
                        resourceManagerGateway,
                        blobServer,
                        heartbeatServices,
-                       NoOpMetricRegistry.INSTANCE,
+                       
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+                       null,
                        archivedExecutionGraphStore,
                        testingJobManagerRunnerFactory,
                        testingFatalErrorHandler,
@@ -283,7 +284,8 @@ public class MiniDispatcherTest extends TestLogger {
                                HeartbeatServices heartbeatServices,
                                BlobServer blobServer,
                                JobManagerSharedServices 
jobManagerSharedServices,
-                               MetricRegistry metricRegistry,
+                               JobManagerJobMetricGroup 
jobManagerJobMetricGroup,
+                               @Nullable String metricQueryServicePath,
                                @Nullable String restAddress) throws Exception {
                        jobGraphFuture.complete(jobGraph);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 9730dde..1d7f090 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -32,8 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -76,8 +75,6 @@ public class JobManagerRunnerTest extends TestLogger {
 
        private static JobManagerSharedServices jobManagerSharedServices;
 
-       private static MetricRegistry metricRegistry;
-
        private static JobGraph jobGraph;
 
        private static ArchivedExecutionGraph archivedExecutionGraph;
@@ -97,8 +94,6 @@ public class JobManagerRunnerTest extends TestLogger {
 
                jobManagerSharedServices = 
JobManagerSharedServices.fromConfiguration(configuration, blobServer);
 
-               metricRegistry = NoOpMetricRegistry.INSTANCE;
-
                final JobVertex jobVertex = new JobVertex("Test vertex");
                jobVertex.setInvokableClass(NoOpInvokable.class);
                jobGraph = new JobGraph(jobVertex);
@@ -215,7 +210,8 @@ public class JobManagerRunnerTest extends TestLogger {
                        heartbeatServices,
                        blobServer,
                        jobManagerSharedServices,
-                       metricRegistry,
+                       
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+                       null,
                        null);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index b543056..ed5a894 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
@@ -410,7 +411,7 @@ public class JobMasterTest extends TestLogger {
                        jobManagerSharedServices,
                        fastHeartbeatServices,
                        blobServer,
-                       null,
+                       
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
                        new NoOpOnCompletionActions(),
                        testingFatalErrorHandler,
                        JobMasterTest.class.getClassLoader(),

Reply via email to