[FLINK-7876] Properly start and shutdown MetricRegistry by ClusterEntrypoint
Suppress MetricRegistry#shutdown exceptions if the metric query service actor's actor system has already been shut down. Address PR comments Pull out TaskManagerMetricGroup instantiation from TaskManagerServices Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad42ee27 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad42ee27 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad42ee27 Branch: refs/heads/master Commit: ad42ee27decb7c563e58ef090373ae8648ca8e81 Parents: d45b941 Author: Till <[email protected]> Authored: Fri Oct 20 12:13:10 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed Nov 1 15:52:01 2017 +0100 ---------------------------------------------------------------------- .../entrypoint/MesosJobClusterEntrypoint.java | 12 ++++-- .../MesosSessionClusterEntrypoint.java | 12 ++++-- .../MesosApplicationMasterRunner.java | 6 ++- .../clusterframework/MesosResourceManager.java | 4 +- .../clusterframework/MesosJobManager.scala | 1 - .../MesosResourceManagerTest.java | 5 ++- .../ScheduledDropwizardReporterTest.java | 2 +- .../DropwizardFlinkHistogramWrapperTest.java | 2 +- .../flink/metrics/jmx/JMXReporterTest.java | 2 +- .../metrics/statsd/StatsDReporterTest.java | 2 +- .../flink/runtime/dispatcher/Dispatcher.java | 22 ++--------- .../dispatcher/StandaloneDispatcher.java | 6 +-- .../runtime/entrypoint/ClusterEntrypoint.java | 8 +++- .../entrypoint/JobClusterEntrypoint.java | 8 ++-- .../entrypoint/SessionClusterEntrypoint.java | 8 ++-- .../StandaloneSessionClusterEntrypoint.java | 4 +- .../runtime/jobmaster/JobManagerRunner.java | 4 +- .../flink/runtime/metrics/MetricRegistry.java | 16 ++++++++ .../runtime/metrics/MetricRegistryImpl.java | 12 +++++- .../flink/runtime/metrics/util/MetricUtils.java | 19 +++++++++ .../flink/runtime/minicluster/MiniCluster.java | 23 ++++++----- .../minicluster/MiniClusterJobDispatcher.java | 8 ++-- .../minicluster/StandaloneMiniCluster.java | 8 ++++ .../resourcemanager/ResourceManager.java | 6 +-- .../resourcemanager/ResourceManagerGateway.java | 2 +- .../resourcemanager/ResourceManagerRunner.java | 4 +- .../StandaloneResourceManager.java | 4 +- .../handler/legacy/TaskManagerLogHandler.java | 37 +++++++++++------- .../runtime/taskexecutor/TaskExecutor.java | 6 --- .../runtime/taskexecutor/TaskManagerRunner.java | 41 ++++++++++++-------- .../taskexecutor/TaskManagerServices.java | 26 +------------ .../runtime/minicluster/FlinkMiniCluster.scala | 11 ++++-- .../minicluster/LocalFlinkMiniCluster.scala | 29 ++++++-------- .../flink/runtime/taskmanager/TaskManager.scala | 18 +++++++-- .../runtime/dispatcher/DispatcherTest.java | 5 ++- .../jobmanager/JobManagerHARecoveryTest.java | 8 ++-- .../JobManagerLeaderElectionTest.java | 29 +++++++------- .../runtime/metrics/NoOpMetricRegistry.java | 8 ++++ .../runtime/metrics/TaskManagerMetricsTest.java | 12 ++++-- .../testutils/UnregisteredTaskMetricsGroup.java | 10 ++--- .../legacy/metrics/MetricFetcherTest.java | 4 +- .../taskexecutor/TaskExecutorITCase.java | 1 - .../runtime/taskexecutor/TaskExecutorTest.java | 25 ++++++------ ...askManagerComponentsStartupShutdownTest.java | 25 ++++++------ .../runtime/testingUtils/TestingCluster.scala | 1 - .../test/checkpointing/SavepointITCase.java | 1 + .../flink/yarn/YarnApplicationMasterRunner.java | 6 ++- .../apache/flink/yarn/YarnResourceManager.java | 4 +- .../entrypoint/YarnJobClusterEntrypoint.java | 4 +- .../YarnSessionClusterEntrypoint.java | 4 +- 50 files changed, 300 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java index b98adff..2fe99de 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; @@ -112,7 +112,13 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { } @Override - protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl metricRegistry) throws Exception { + protected void startClusterComponents( + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry) throws Exception { super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry); } @@ -123,7 +129,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java index 0cf0fce..b8d9f65 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; @@ -102,7 +102,13 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { } @Override - protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl metricRegistry) throws Exception { + protected void startClusterComponents( + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry) throws Exception { super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry); } @@ -113,7 +119,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 9887d97..93eb3c6 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -432,7 +432,11 @@ public class MesosApplicationMasterRunner { } if (metricRegistry != null) { - metricRegistry.shutdown(); + try { + metricRegistry.shutdown(); + } catch (Throwable t) { + LOG.error("Could not shut down metric registry.", t); + } } org.apache.flink.runtime.concurrent.Executors.gracefulShutdown( http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 7ea4908..1e32b2c 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; @@ -145,7 +145,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler, // Mesos specifics http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala index c6230e7..972af35 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala @@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup -import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry} import scala.concurrent.duration._ http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 1cdd087..a45abe0 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; @@ -160,7 +161,7 @@ public class MesosResourceManagerTest extends TestLogger { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler, @@ -306,7 +307,7 @@ public class MesosResourceManagerTest extends TestLogger { public final ScheduledExecutor scheduledExecutor; public final TestingHighAvailabilityServices highAvailabilityServices; public final HeartbeatServices heartbeatServices; - public final MetricRegistryImpl metricRegistry; + public final MetricRegistry metricRegistry; public final TestingLeaderElectionService rmLeaderElectionService; public final JobLeaderIdService jobLeaderIdService; public final SlotManager slotManager; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index e6d5e27..3fa0474 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -34,8 +34,8 @@ import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index a927a30..8f70abb 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions; import org.apache.flink.dropwizard.ScheduledDropwizardReporter; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.util.TestLogger; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 4c97055..98c2d1b 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.util.TestMeter; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.util.TestReporter; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index f460abd..275f2e1 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -30,8 +30,8 @@ import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index c2f8539..cf3405b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -45,20 +45,17 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceOverview; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; -import akka.actor.ActorSystem; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -88,7 +85,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme private final ResourceManagerGateway resourceManagerGateway; private final JobManagerServices jobManagerServices; private final HeartbeatServices heartbeatServices; - private final MetricRegistryImpl metricRegistry; + private final MetricRegistry metricRegistry; private final FatalErrorHandler fatalErrorHandler; @@ -106,7 +103,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional<String> restAddress) throws Exception { super(rpcService, endpointId); @@ -162,12 +159,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } try { - metricRegistry.shutdown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - try { super.postStop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); @@ -182,11 +173,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme public void start() throws Exception { super.start(); - // start the MetricQueryService - // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint - final ActorSystem actorSystem = ((AkkaRpcService) getRpcService()).getActorSystem(); - metricRegistry.startQueryService(actorSystem, null); - leaderElectionService.start(this); } @@ -479,7 +465,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index ee92663..5a6889e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.jobmaster.JobMaster; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -49,7 +49,7 @@ public class StandaloneDispatcher extends Dispatcher { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional<String> restAddress) throws Exception { super( @@ -74,7 +74,7 @@ public class StandaloneDispatcher extends Dispatcher { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception { // create the standard job manager runner http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 1a0e2ae..156efdc 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -180,6 +181,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { blobServer.start(); heartbeatServices = createHeartbeatServices(configuration); metricRegistry = createMetricRegistry(configuration); + + // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint + // start the MetricQueryService + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); } protected RpcService createRpcService( @@ -278,7 +284,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry) throws Exception; + MetricRegistry metricRegistry) throws Exception; protected void stopClusterComponents(boolean cleanupHaData) throws Exception { } http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index 50d29da..124c6c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -59,7 +59,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry) throws Exception { + MetricRegistry metricRegistry) throws Exception { resourceManager = createResourceManager( configuration, @@ -96,7 +96,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { HighAvailabilityServices highAvailabilityServices, JobManagerServices jobManagerServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { JobGraph jobGraph = retrieveJobGraph(configuration); @@ -163,7 +163,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception; protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index 8a48864..e24e01a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; @@ -69,7 +69,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry) throws Exception { + MetricRegistry metricRegistry) throws Exception { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); @@ -173,7 +173,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional<String> restAddress) throws Exception { @@ -197,6 +197,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java index 7d4373d..e7c9816 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; @@ -51,7 +51,7 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 0a85bbe..14baa6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -111,7 +111,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F final HighAvailabilityServices haServices, final HeartbeatServices heartbeatServices, final JobManagerServices jobManagerServices, - final MetricRegistryImpl metricRegistry, + final MetricRegistry metricRegistry, final OnCompletionActions toNotifyOnComplete, final FatalErrorHandler errorHandler) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index 9aa97cb..782d66a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -19,9 +19,12 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import javax.annotation.Nullable; + /** * Interface for a metric registry. */ @@ -65,5 +68,18 @@ public interface MetricRegistry { */ void unregister(Metric metric, String metricName, AbstractMetricGroup group); + /** + * Returns the scope formats. + * + * @return scope formats + */ ScopeFormats getScopeFormats(); + + /** + * Returns the path of the {@link MetricQueryService} or null, if none is started. + * + * @return Path of the MetricQueryService or null, if none is started + */ + @Nullable + String getMetricQueryServicePath(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index 407fa8b..3e4f56f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -188,6 +188,7 @@ public class MetricRegistryImpl implements MetricRegistry { * * @return address of the metric query service */ + @Override @Nullable public String getMetricQueryServicePath() { return metricQueryServicePath; @@ -238,7 +239,16 @@ public class MetricRegistryImpl implements MetricRegistry { if (queryService != null) { stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); - stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + + try { + stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + } catch (IllegalStateException ignored) { + // this can happen if the underlying actor system has been stopped before shutting + // the metric registry down + // TODO: Pull the MetricQueryService actor out of the MetricRegistry + LOG.debug("The metric query service actor has already been stopped because the " + + "underlying ActorSystem has already been shut down."); + } } if (reporters != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 2ecde42..08353e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -21,6 +21,10 @@ package org.apache.flink.runtime.metrics.util; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.commons.lang3.text.WordUtils; import org.slf4j.Logger; @@ -47,6 +51,21 @@ public class MetricUtils { private MetricUtils() { } + public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup( + MetricRegistry metricRegistry, + TaskManagerLocation taskManagerLocation, + NetworkEnvironment network) { + final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup( + metricRegistry, + taskManagerLocation.getHostname(), + taskManagerLocation.getResourceID().toString()); + + // Initialize the TM metrics + TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network); + + return taskManagerMetricGroup; + } + public static void instantiateNetworkMetrics( MetricGroup metrics, final NetworkEnvironment network) { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index d4248ee..2bbd2c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -32,8 +32,9 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderelection.LeaderAddressAndId; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner; @@ -160,6 +161,10 @@ public class MiniCluster { // we always need the 'commonRpcService' for auxiliary calls commonRpcService = createRpcService(configuration, rpcTimeout, false, null); + // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); + if (useSingleRpcService) { // set that same RPC service for all JobManagers and TaskManagers for (int i = 0; i < numJobManagers; i++) { @@ -326,6 +331,12 @@ public class MiniCluster { taskManagers = null; } + // metrics shutdown + if (metricRegistry != null) { + metricRegistry.shutdown(); + metricRegistry = null; + } + // shut down the RpcServices exception = shutDownRpc(commonRpcService, exception); exception = shutDownRpcs(jobManagerRpcServices, exception); @@ -356,12 +367,6 @@ public class MiniCluster { haServices = null; } - // metrics shutdown - if (metricRegistry != null) { - metricRegistry.shutdown(); - metricRegistry = null; - } - // if anything went wrong, throw the first error with all the additional suppressed exceptions if (exception != null) { ExceptionUtils.rethrowException(exception, "Error while shutting down mini cluster"); @@ -502,7 +507,7 @@ public class MiniCluster { Configuration configuration, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, int numResourceManagers, RpcService[] resourceManagerRpcServices) throws Exception { @@ -528,7 +533,7 @@ public class MiniCluster { protected TaskExecutor[] startTaskManagers( Configuration configuration, HighAvailabilityServices haServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, int numTaskManagers, RpcService[] taskManagerRpcServices) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index ca042b6..60d9a66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; @@ -75,7 +75,7 @@ public class MiniClusterJobDispatcher { private final JobManagerServices jobManagerServices; /** Registry for all metrics in the mini cluster */ - private final MetricRegistryImpl metricRegistry; + private final MetricRegistry metricRegistry; /** The number of JobManagers to launch (more than one simulates a high-availability setup) */ private final int numJobManagers; @@ -104,7 +104,7 @@ public class MiniClusterJobDispatcher { HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry) throws Exception { + MetricRegistry metricRegistry) throws Exception { this( config, haServices, @@ -132,7 +132,7 @@ public class MiniClusterJobDispatcher { HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, int numJobManagers, RpcService[] rpcServices) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java index 90fb115..a8c402a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java @@ -95,6 +95,8 @@ public class StandaloneMiniCluster { metricRegistry = new MetricRegistryImpl( MetricRegistryConfiguration.fromConfiguration(configuration)); + metricRegistry.startQueryService(actorSystem, null); + JobManager.startJobManagerActors( configuration, actorSystem, @@ -142,6 +144,12 @@ public class StandaloneMiniCluster { public void close() throws Exception { Exception exception = null; + try { + metricRegistry.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + actorSystem.shutdown(); actorSystem.awaitTermination(); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 98b80c6..cccaf95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; @@ -118,7 +118,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; /** Registry to use for metrics. */ - private final MetricRegistryImpl metricRegistry; + private final MetricRegistry metricRegistry; /** Fatal error handler. */ private final FatalErrorHandler fatalErrorHandler; @@ -140,7 +140,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index cc2766b..f67368c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -174,7 +174,7 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager * Requests the paths for the TaskManager's {@link MetricQueryService} to query. * * @param timeout for the asynchronous operation - * @return Future containing the collection of instance ids and the corresponding metric query service path + * @return Future containing the collection of resource ids and the corresponding metric query service path */ CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index 361bdd4..caa3ba0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkException; @@ -55,7 +55,7 @@ public class ResourceManagerRunner implements FatalErrorHandler { final RpcService rpcService, final HighAvailabilityServices highAvailabilityServices, final HeartbeatServices heartbeatServices, - final MetricRegistryImpl metricRegistry) throws Exception { + final MetricRegistry metricRegistry) throws Exception { Preconditions.checkNotNull(resourceId); Preconditions.checkNotNull(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index d2b1205..624f31d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -45,7 +45,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java index 4d6ccd5..118e356 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.rest.handler.RedirectHandler; import org.apache.flink.runtime.rest.handler.WebHandler; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.StringUtils; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; @@ -66,7 +65,9 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; +import java.net.URLDecoder; import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Objects; @@ -160,14 +161,22 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im executor); } - final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY); + final String taskManagerId = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY); final HttpRequest request = routed.request(); //fetch TaskManager logs if no other process is currently doing it - if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) { + if (lastRequestPending.putIfAbsent(taskManagerId, true) == null) { try { - ResourceID resourceId = new ResourceID(new String(StringUtils.hexStringToByte(taskManagerID))); - CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout); + final String unescapedString; + + try { + unescapedString = URLDecoder.decode(taskManagerId, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new FlinkException("Could not decode task manager id: " + taskManagerId + '.', e); + } + + final ResourceID resourceId = new ResourceID(unescapedString); + final CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout); CompletableFuture<TransientBlobKey> blobKeyFuture = taskManagerFuture.thenCompose( (Optional<Instance> optTMInstance) -> { @@ -189,18 +198,18 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im (blobKey, blobCache) -> { //delete previous log file, if it is different than the current one HashMap<String, TransientBlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout; - if (lastSubmittedFile.containsKey(taskManagerID)) { + if (lastSubmittedFile.containsKey(taskManagerId)) { // the BlobKey will almost certainly be different but the old file // may not exist anymore so we cannot rely on it and need to // download the new file anyway, even if the hashes match - if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) { - if (!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerID))) { - throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerID + '.')); + if (!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerId))) { + if (!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerId))) { + throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerId + '.')); } - lastSubmittedFile.put(taskManagerID, blobKey); + lastSubmittedFile.put(taskManagerId, blobKey); } } else { - lastSubmittedFile.put(taskManagerID, blobKey); + lastSubmittedFile.put(taskManagerId, blobKey); } try { return blobCache.getFile(blobKey).getAbsolutePath(); @@ -214,7 +223,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im failure -> { display(ctx, request, "Fetching TaskManager log failed."); LOG.error("Fetching TaskManager log failed.", failure); - lastRequestPending.remove(taskManagerID); + lastRequestPending.remove(taskManagerId); return null; }); @@ -261,7 +270,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im // write the content. ChannelFuture lastContentFuture; final GenericFutureListener<Future<? super Void>> completionListener = future -> { - lastRequestPending.remove(taskManagerID); + lastRequestPending.remove(taskManagerId); fc.close(); raf.close(); }; @@ -294,7 +303,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im } catch (Exception e) { display(ctx, request, "Error: " + e.getMessage()); LOG.error("Fetching TaskManager log failed.", e); - lastRequestPending.remove(taskManagerID); + lastRequestPending.remove(taskManagerId); } } else { display(ctx, request, "loading..."); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index a956111..c48d188 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -55,7 +55,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.registration.RegistrationConnectionListener; @@ -134,9 +133,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { /** The network component in the task manager */ private final NetworkEnvironment networkEnvironment; - /** The metric registry in the task manager */ - private final MetricRegistryImpl metricRegistry; - /** The heartbeat manager for job manager in the task manager */ private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; @@ -179,7 +175,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { NetworkEnvironment networkEnvironment, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, @@ -198,7 +193,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.ioManager = checkNotNull(ioManager); this.networkEnvironment = checkNotNull(networkEnvironment); this.haServices = checkNotNull(haServices); - this.metricRegistry = checkNotNull(metricRegistry); this.taskSlotTable = checkNotNull(taskSlotTable); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 5a69bb1..a24daf0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -29,8 +29,11 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; @@ -75,7 +78,7 @@ public class TaskManagerRunner implements FatalErrorHandler { private final Configuration configuration; - private final ResourceID resourceID; + private final ResourceID resourceId; private final Time timeout; @@ -92,7 +95,7 @@ public class TaskManagerRunner implements FatalErrorHandler { public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception { this.configuration = Preconditions.checkNotNull(configuration); - this.resourceID = Preconditions.checkNotNull(resourceId); + this.resourceId = Preconditions.checkNotNull(resourceId); timeout = AkkaUtils.getTimeoutAsTime(configuration); @@ -111,12 +114,13 @@ public class TaskManagerRunner implements FatalErrorHandler { metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)); + // TODO: Temporary hack until the MetricQueryService has been ported to RpcEndpoint final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); metricRegistry.startQueryService(actorSystem, resourceId); taskManager = startTaskManager( - configuration, - resourceId, + this.configuration, + this.resourceId, rpcService, highAvailabilityServices, heartbeatServices, @@ -242,14 +246,14 @@ public class TaskManagerRunner implements FatalErrorHandler { // -------------------------------------------------------------------------------------------- public static TaskExecutor startTaskManager( - Configuration configuration, - ResourceID resourceID, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, - boolean localCommunicationOnly, - FatalErrorHandler fatalErrorHandler) throws Exception { + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + boolean localCommunicationOnly, + FatalErrorHandler fatalErrorHandler) throws Exception { Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(resourceID); @@ -266,8 +270,12 @@ public class TaskManagerRunner implements FatalErrorHandler { TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, - resourceID, - metricRegistry); + resourceID); + + TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( + metricRegistry, + taskManagerServices.getTaskManagerLocation(), + taskManagerServices.getNetworkEnvironment()); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); @@ -280,8 +288,7 @@ public class TaskManagerRunner implements FatalErrorHandler { taskManagerServices.getNetworkEnvironment(), highAvailabilityServices, heartbeatServices, - metricRegistry, - taskManagerServices.getTaskManagerMetricGroup(), + taskManagerMetricGroup, taskManagerServices.getBroadcastVariableManager(), taskManagerServices.getFileCache(), taskManagerServices.getTaskSlotTable(), http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 85e62c6..aed03f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -37,16 +37,12 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.QueryableStateUtils; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; -import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -63,7 +59,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; /** * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager}, - * {@link NetworkEnvironment} and the {@link MetricRegistryImpl}. + * {@link NetworkEnvironment}. */ public class TaskManagerServices { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class); @@ -73,7 +69,6 @@ public class TaskManagerServices { private final MemoryManager memoryManager; private final IOManager ioManager; private final NetworkEnvironment networkEnvironment; - private final TaskManagerMetricGroup taskManagerMetricGroup; private final BroadcastVariableManager broadcastVariableManager; private final FileCache fileCache; private final TaskSlotTable taskSlotTable; @@ -85,7 +80,6 @@ public class TaskManagerServices { MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, - TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, TaskSlotTable taskSlotTable, @@ -96,7 +90,6 @@ public class TaskManagerServices { this.memoryManager = Preconditions.checkNotNull(memoryManager); this.ioManager = Preconditions.checkNotNull(ioManager); this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); - this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup); this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager); this.fileCache = Preconditions.checkNotNull(fileCache); this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable); @@ -124,10 +117,6 @@ public class TaskManagerServices { return taskManagerLocation; } - public TaskManagerMetricGroup getTaskManagerMetricGroup() { - return taskManagerMetricGroup; - } - public BroadcastVariableManager getBroadcastVariableManager() { return broadcastVariableManager; } @@ -157,14 +146,12 @@ public class TaskManagerServices { * * @param resourceID resource ID of the task manager * @param taskManagerServicesConfiguration task manager configuration - * @param metricRegistry to register the TaskManagerMetricGroup * @return task manager components * @throws Exception */ public static TaskManagerServices fromConfiguration( TaskManagerServicesConfiguration taskManagerServicesConfiguration, - ResourceID resourceID, - MetricRegistry metricRegistry) throws Exception { + ResourceID resourceID) throws Exception { // pre-start checks checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); @@ -183,14 +170,6 @@ public class TaskManagerServices { // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); - final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup( - metricRegistry, - taskManagerLocation.getHostname(), - taskManagerLocation.getResourceID().toString()); - - // Initialize the TM metrics - TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network); - final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager(); final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths()); @@ -216,7 +195,6 @@ public class TaskManagerServices { memoryManager, ioManager, network, - taskManagerMetricGroup, broadcastVariableManager, fileCache, taskSlotTable, http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 689d98f..227b854 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -122,8 +122,7 @@ abstract class FlinkMiniCluster( Hardware.getNumberCPUCores(), new ExecutorThreadFactory("mini-cluster-io")) - protected val metricRegistry = new MetricRegistryImpl( - MetricRegistryConfiguration.fromConfiguration(originalConfiguration)) + protected var metricRegistryOpt: Option[MetricRegistryImpl] = None def this(configuration: Configuration, useSingleActorSystem: Boolean) { this( @@ -329,6 +328,11 @@ abstract class FlinkMiniCluster( lazy val singleActorSystem = startJobManagerActorSystem(0) + val metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(originalConfiguration)) + + metricRegistryOpt = Some(metricRegistry) + if (originalConfiguration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { metricRegistry.startQueryService(singleActorSystem, null) } @@ -463,6 +467,8 @@ abstract class FlinkMiniCluster( Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout) + metricRegistryOpt.foreach(_.shutdown()) + if (!useSingleActorSystem) { taskManagerActorSystems foreach { _ foreach(_.shutdown()) @@ -476,7 +482,6 @@ abstract class FlinkMiniCluster( jobManagerActorSystems foreach { _ foreach(_.shutdown()) } - } def awaitTermination(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index e9bdb2a..89197e2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -19,13 +19,12 @@ package org.apache.flink.runtime.minicluster import java.net.InetAddress -import java.util.UUID import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.{ActorRef, ActorSystem, Props} import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat -import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions} +import org.apache.flink.configuration._ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory @@ -48,14 +47,14 @@ import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup} -import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl} +import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.util.NetUtils -import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{Await, ExecutionContext} /** * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same @@ -85,12 +84,6 @@ class LocalFlinkMiniCluster( def this(userConfiguration: Configuration) = this(userConfiguration, true) - override def startInternalShutdown() { - metricRegistry.shutdown() - - super.startInternalShutdown() - } - // -------------------------------------------------------------------------- override def generateConfiguration(userConfiguration: Configuration): Configuration = { @@ -158,7 +151,7 @@ class LocalFlinkMiniCluster( futureExecutor, ioExecutor, highAvailabilityServices.createBlobStore(), - metricRegistry) + metricRegistryOpt.get) val archive = system.actorOf( getArchiveProps( @@ -253,8 +246,12 @@ class LocalFlinkMiniCluster( val taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, - resourceID, - metricRegistry) + resourceID) + + val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( + metricRegistryOpt.get, + taskManagerServices.getTaskManagerLocation(), + taskManagerServices.getNetworkEnvironment()) val props = getTaskManagerProps( taskManagerClass, @@ -264,11 +261,7 @@ class LocalFlinkMiniCluster( taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment, - taskManagerServices.getTaskManagerMetricGroup) - - if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { - metricRegistry.startQueryService(system, resourceID) - } + taskManagerMetricGroup) system.actorOf(props, taskManagerActorName) } http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f209dac..f948df4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -63,6 +63,7 @@ import org.apache.flink.runtime.messages.TaskMessages._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} import org.apache.flink.runtime.messages.{Acknowledge, StackTraceSampleResponse} import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup +import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils} @@ -1888,7 +1889,12 @@ object TaskManager { } // shut down the metric query service - metricRegistry.shutdown() + try { + metricRegistry.shutdown() + } catch { + case t: Throwable => + LOG.error("Could not properly shut down the metric registry.", t) + } } /** @@ -1996,8 +2002,12 @@ object TaskManager { val taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, - resourceID, - metricRegistry) + resourceID) + + val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( + metricRegistry, + taskManagerServices.getTaskManagerLocation(), + taskManagerServices.getNetworkEnvironment()) // create the actor properties (which define the actor constructor parameters) val tmProps = getTaskManagerProps( @@ -2009,7 +2019,7 @@ object TaskManager { taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), highAvailabilityServices, - taskManagerServices.getTaskManagerMetricGroup) + taskManagerMetricGroup) taskManagerActorName match { case Some(actorName) => actorSystem.actorOf(tmProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 8558145..a511d45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -209,7 +210,7 @@ public class DispatcherTest extends TestLogger { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, JobManagerRunner jobManagerRunner, JobID expectedJobId) throws Exception { @@ -238,7 +239,7 @@ public class DispatcherTest extends TestLogger { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index d843da2..88141d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -53,8 +53,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; @@ -62,7 +62,6 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -96,7 +95,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -208,7 +206,7 @@ public class JobManagerHARecoveryTest extends TestLogger { mySubmittedJobGraphStore, checkpointStateFactory, jobRecoveryTimeout, - Option.<MetricRegistryImpl>empty(), + new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), Option.<String>empty()); jobManager = system.actorOf(jobManagerProps); @@ -383,7 +381,7 @@ public class JobManagerHARecoveryTest extends TestLogger { submittedJobGraphStore, mock(CheckpointRecoveryFactory.class), jobRecoveryTimeout, - Option.<MetricRegistryImpl>apply(null), + new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), recoveredJobs).withDispatcher(CallingThreadDispatcher.Id()); jobManager = system.actorOf(jobManagerProps);
