Repository: flink Updated Branches: refs/heads/master 91d346e9e -> 31c0754ee
[FLINK-9016] [flip6] Properly unregister jobs from JobMetricGroup This commit properly removes jobs from the JobMetricGroup once a job has reached a terminal state. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4a1d09c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4a1d09c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4a1d09c Branch: refs/heads/master Commit: c4a1d09ccdbd19416e15534ceba45ead5d2d6ed2 Parents: 91d346e Author: Till Rohrmann <[email protected]> Authored: Fri Mar 16 19:16:49 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Mar 18 14:25:00 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 33 ++++++++++++++------ .../runtime/dispatcher/MiniDispatcher.java | 8 +++-- .../dispatcher/StandaloneDispatcher.java | 8 +++-- .../runtime/entrypoint/ClusterEntrypoint.java | 27 ++++++++++++++-- .../entrypoint/JobClusterEntrypoint.java | 8 +++-- .../entrypoint/SessionClusterEntrypoint.java | 26 ++++++++------- .../runtime/jobmaster/JobManagerRunner.java | 31 +++--------------- .../flink/runtime/jobmaster/JobMaster.java | 15 +++------ .../flink/runtime/minicluster/MiniCluster.java | 14 ++++++++- .../runtime/dispatcher/DispatcherTest.java | 20 +++++++----- .../runtime/dispatcher/MiniDispatcherTest.java | 10 +++--- .../runtime/jobmaster/JobManagerRunnerTest.java | 10 ++---- .../flink/runtime/jobmaster/JobMasterTest.java | 3 +- 13 files changed, 122 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 9b2411c..91a4f73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -49,7 +49,8 @@ import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceOverview; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; @@ -97,7 +98,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme private final JobManagerSharedServices jobManagerSharedServices; private final HeartbeatServices heartbeatServices; private final BlobServer blobServer; - private final MetricRegistry metricRegistry; private final FatalErrorHandler fatalErrorHandler; @@ -109,6 +109,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme private final JobManagerRunnerFactory jobManagerRunnerFactory; + private final JobManagerMetricGroup jobManagerMetricGroup; + + @Nullable + private final String metricQueryServicePath; + @Nullable protected final String restAddress; @@ -123,7 +128,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricServiceQueryPath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @@ -135,9 +141,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme this.resourceManagerGateway = Preconditions.checkNotNull(resourceManagerGateway); this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); this.blobServer = Preconditions.checkNotNull(blobServer); - this.metricRegistry = Preconditions.checkNotNull(metricRegistry); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); this.submittedJobGraphStore = Preconditions.checkNotNull(submittedJobGraphStore); + this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup); + this.metricQueryServicePath = metricServiceQueryPath; this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration( configuration, @@ -192,6 +199,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme exception = ExceptionUtils.firstOrSuppressed(e, exception); } + jobManagerMetricGroup.close(); + if (exception != null) { throw exception; } else { @@ -251,7 +260,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme heartbeatServices, blobServer, jobManagerSharedServices, - metricRegistry, + jobManagerMetricGroup.addJob(jobGraph), + metricQueryServicePath, restAddress); jobManagerRunner.getResultFuture().whenCompleteAsync( @@ -464,8 +474,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme @Override public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) { - final String metricQueryServicePath = metricRegistry.getMetricQueryServicePath(); - if (metricQueryServicePath != null) { return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath)); } else { @@ -513,6 +521,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture); } + jobManagerMetricGroup.removeJob(jobId); + if (cleanupHA) { submittedJobGraphStore.removeJobGraph(jobId); } @@ -725,7 +735,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, - MetricRegistry metricRegistry, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + @Nullable String metricQueryServicePath, @Nullable String restAddress) throws Exception; } @@ -745,7 +756,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, - MetricRegistry metricRegistry, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + @Nullable String metricQueryServicePath, @Nullable String restAddress) throws Exception { return new JobManagerRunner( resourceId, @@ -756,7 +768,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme heartbeatServices, blobServer, jobManagerServices, - metricRegistry, + jobManagerJobMetricGroup, + metricQueryServicePath, restAddress); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index c648131..3f45824 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -60,7 +60,8 @@ public class MiniDispatcher extends Dispatcher { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @@ -76,7 +77,8 @@ public class MiniDispatcher extends Dispatcher { resourceManagerGateway, blobServer, heartbeatServices, - metricRegistry, + jobManagerMetricGroup, + metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index a7d21f3..52ac7a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobMaster; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -45,7 +45,8 @@ public class StandaloneDispatcher extends Dispatcher { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @@ -59,7 +60,8 @@ public class StandaloneDispatcher extends Dispatcher { resourceManagerGateway, blobServer, heartbeatServices, - metricRegistry, + jobManagerMetricGroup, + metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 07b3b68..676415b 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -51,6 +51,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; @@ -154,6 +156,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { @GuardedBy("lock") private ClusterInformation clusterInformation; + @GuardedBy("lock") + private JobManagerMetricGroup jobManagerMetricGroup; + protected ClusterEntrypoint(Configuration configuration) { this.configuration = Preconditions.checkNotNull(configuration); this.terminationFuture = new CompletableFuture<>(); @@ -327,6 +332,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { clusterInformation, webMonitorEndpoint.getRestAddress()); + jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress()); + dispatcher = createDispatcher( configuration, rpcService, @@ -334,7 +341,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { resourceManager.getSelfGateway(ResourceManagerGateway.class), blobServer, heartbeatServices, - metricRegistry, + jobManagerMetricGroup, + metricRegistry.getMetricQueryServicePath(), archivedExecutionGraphStore, this, webMonitorEndpoint.getRestAddress()); @@ -488,7 +496,19 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { terminationFutures.add(FutureUtils.completedExceptionally(exception)); } - return FutureUtils.completeAll(terminationFutures); + final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures); + + if (jobManagerMetricGroup != null) { + return FutureUtils.runAfterwards( + componentTerminationFuture, + () -> { + synchronized (lock) { + jobManagerMetricGroup.close(); + } + }); + } else { + return componentTerminationFuture; + } } } @@ -567,7 +587,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index dc211d8..df950a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint; import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; @@ -95,7 +95,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { @@ -114,7 +115,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { resourceManagerGateway, blobServer, heartbeatServices, - metricRegistry, + jobManagerMetricGroup, + metricQueryServicePath, archivedExecutionGraphStore, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, fatalErrorHandler, http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index 764356d..fcab796 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; @@ -104,16 +104,17 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { @Override protected Dispatcher createDispatcher( - Configuration configuration, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - ResourceManagerGateway resourceManagerGateway, - BlobServer blobServer, - HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, - ArchivedExecutionGraphStore archivedExecutionGraphStore, - FatalErrorHandler fatalErrorHandler, - @Nullable String restAddress) throws Exception { + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + ResourceManagerGateway resourceManagerGateway, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, + ArchivedExecutionGraphStore archivedExecutionGraphStore, + FatalErrorHandler fatalErrorHandler, + @Nullable String restAddress) throws Exception { // create the default dispatcher return new StandaloneDispatcher( @@ -124,7 +125,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { resourceManagerGateway, blobServer, heartbeatServices, - metricRegistry, + jobManagerMetricGroup, + metricQueryServicePath, archivedExecutionGraphStore, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, fatalErrorHandler, http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 8b64f0d..cd2852b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -35,9 +35,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; -import org.apache.flink.runtime.metrics.util.MetricUtils; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.AutoCloseableAsync; @@ -82,8 +80,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F private final JobMaster jobManager; - private final JobManagerMetricGroup jobManagerMetricGroup; - private final Time rpcTimeout; private final CompletableFuture<ArchivedExecutionGraph> resultFuture; @@ -111,11 +107,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F final HeartbeatServices heartbeatServices, final BlobServer blobServer, final JobManagerSharedServices jobManagerSharedServices, - final MetricRegistry metricRegistry, + final JobManagerJobMetricGroup jobManagerJobMetricGroup, + @Nullable final String metricQueryServicePath, @Nullable final String restAddress) throws Exception { - JobManagerMetricGroup jobManagerMetrics = null; - this.resultFuture = new CompletableFuture<>(); this.terminationFuture = new CompletableFuture<>(); @@ -126,10 +121,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); - final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress(); - jobManagerMetrics = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostAddress); - this.jobManagerMetricGroup = jobManagerMetrics; - // libraries and class loader first final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager(); try { @@ -162,19 +153,14 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F jobManagerSharedServices, heartbeatServices, blobServer, - jobManagerMetrics, + jobManagerJobMetricGroup, this, this, userCodeLoader, restAddress, - metricRegistry.getMetricQueryServicePath()); + metricQueryServicePath); } catch (Throwable t) { - // clean up everything - if (jobManagerMetrics != null) { - jobManagerMetrics.close(); - } - terminationFuture.completeExceptionally(t); resultFuture.completeExceptionally(t); @@ -230,13 +216,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F throwable = ExceptionUtils.firstOrSuppressed(t, ExceptionUtils.stripCompletionException(throwable)); } - // make all registered metrics go away - try { - jobManagerMetricGroup.close(); - } catch (Throwable t) { - throwable = ExceptionUtils.firstOrSuppressed(t, throwable); - } - if (throwable != null) { terminationFuture.completeExceptionally( new FlinkException("Could not properly shut down the JobManagerRunner", throwable)); http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 74f9b65..ced8c7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; @@ -76,8 +75,7 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.query.UnknownKvStateLocation; @@ -165,7 +163,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private final BlobServer blobServer; /** The metrics for the job. */ - private final MetricGroup jobMetricGroup; + private final JobManagerJobMetricGroup jobMetricGroup; /** The heartbeat manager with task managers. */ private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager; @@ -225,7 +223,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - @Nullable JobManagerMetricGroup jobManagerMetricGroup, + JobManagerJobMetricGroup jobMetricGroup, OnCompletionActions jobCompletionActions, FatalErrorHandler errorHandler, ClassLoader userCodeLoader, @@ -246,6 +244,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast this.jobCompletionActions = checkNotNull(jobCompletionActions); this.errorHandler = checkNotNull(errorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); + this.jobMetricGroup = checkNotNull(jobMetricGroup); this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, @@ -262,12 +261,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast final String jobName = jobGraph.getName(); final JobID jid = jobGraph.getJobID(); - if (jobManagerMetricGroup != null) { - this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph); - } else { - this.jobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); - } - log.info("Initializing job {} ({}).", jobName, jid); final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 98c8ca2..d660c67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -50,6 +50,8 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner; @@ -154,6 +156,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { private StandaloneDispatcher dispatcher; @GuardedBy("lock") + private JobManagerMetricGroup jobManagerMetricGroup; + + @GuardedBy("lock") private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever; /** Flag marking the mini cluster as started/running. */ @@ -344,6 +349,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); + this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost"); + dispatcher = new StandaloneDispatcher( jobManagerRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), @@ -352,7 +359,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { resourceManagerRunner.getResourceManageGateway(), blobServer, heartbeatServices, - metricRegistry, + jobManagerMetricGroup, + metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), @@ -424,6 +432,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { componentsTerminationFuture, () -> { synchronized (lock) { + if (jobManagerMetricGroup != null) { + jobManagerMetricGroup.close(); + jobManagerMetricGroup = null; + } // metrics shutdown if (metricRegistry != null) { metricRegistry.shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 8267921..71c391f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -45,8 +45,9 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -177,7 +178,8 @@ public class DispatcherTest extends TestLogger { mock(ResourceManagerGateway.class), new BlobServer(blobServerConfig, new VoidBlobStore()), heartbeatServices, - NoOpMetricRegistry.INSTANCE, + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), + null, new MemoryArchivedExecutionGraphStore(), fatalErrorHandler, TEST_JOB_ID); @@ -360,7 +362,8 @@ public class DispatcherTest extends TestLogger { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, JobID expectedJobId) throws Exception { @@ -373,7 +376,8 @@ public class DispatcherTest extends TestLogger { resourceManagerGateway, blobServer, heartbeatServices, - metricRegistry, + jobManagerMetricGroup, + metricQueryServicePath, archivedExecutionGraphStore, new ExpectedJobIdJobManagerRunnerFactory(expectedJobId), fatalErrorHandler, @@ -421,7 +425,8 @@ public class DispatcherTest extends TestLogger { HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, - MetricRegistry metricRegistry, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + @Nullable String metricQueryServicePath, @Nullable String restAddress) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); @@ -434,7 +439,8 @@ public class DispatcherTest extends TestLogger { heartbeatServices, blobServer, jobManagerSharedServices, - metricRegistry, + jobManagerJobMetricGroup, + metricQueryServicePath, restAddress); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index c6eda2e..651200f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.RpcService; @@ -254,7 +254,8 @@ public class MiniDispatcherTest extends TestLogger { resourceManagerGateway, blobServer, heartbeatServices, - NoOpMetricRegistry.INSTANCE, + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), + null, archivedExecutionGraphStore, testingJobManagerRunnerFactory, testingFatalErrorHandler, @@ -283,7 +284,8 @@ public class MiniDispatcherTest extends TestLogger { HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, - MetricRegistry metricRegistry, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + @Nullable String metricQueryServicePath, @Nullable String restAddress) throws Exception { jobGraphFuture.complete(jobGraph); http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java index 9730dde..1d7f090 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java @@ -32,8 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -76,8 +75,6 @@ public class JobManagerRunnerTest extends TestLogger { private static JobManagerSharedServices jobManagerSharedServices; - private static MetricRegistry metricRegistry; - private static JobGraph jobGraph; private static ArchivedExecutionGraph archivedExecutionGraph; @@ -97,8 +94,6 @@ public class JobManagerRunnerTest extends TestLogger { jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(configuration, blobServer); - metricRegistry = NoOpMetricRegistry.INSTANCE; - final JobVertex jobVertex = new JobVertex("Test vertex"); jobVertex.setInvokableClass(NoOpInvokable.class); jobGraph = new JobGraph(jobVertex); @@ -215,7 +210,8 @@ public class JobManagerRunnerTest extends TestLogger { heartbeatServices, blobServer, jobManagerSharedServices, - metricRegistry, + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + null, null); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c4a1d09c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index b543056..ed5a894 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; @@ -410,7 +411,7 @@ public class JobMasterTest extends TestLogger { jobManagerSharedServices, fastHeartbeatServices, blobServer, - null, + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), new NoOpOnCompletionActions(), testingFatalErrorHandler, JobMasterTest.class.getClassLoader(),
