http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java new file mode 100644 index 0000000..407fa8b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Kill; +import akka.pattern.Patterns; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the + * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. + */ +public class MetricRegistryImpl implements MetricRegistry { + static final Logger LOG = LoggerFactory.getLogger(MetricRegistryImpl.class); + + private final Object lock = new Object(); + + private List<MetricReporter> reporters; + private ScheduledExecutorService executor; + + @Nullable + private ActorRef queryService; + + @Nullable + private String metricQueryServicePath; + + private ViewUpdater viewUpdater; + + private final ScopeFormats scopeFormats; + private final char globalDelimiter; + private final List<Character> delimiters = new ArrayList<>(); + + /** + * Creates a new MetricRegistry and starts the configured reporter. + */ + public MetricRegistryImpl(MetricRegistryConfiguration config) { + this.scopeFormats = config.getScopeFormats(); + this.globalDelimiter = config.getDelimiter(); + + // second, instantiate any custom configured reporters + this.reporters = new ArrayList<>(); + + List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations(); + + this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry")); + + this.queryService = null; + this.metricQueryServicePath = null; + + if (reporterConfigurations.isEmpty()) { + // no reporters defined + // by default, don't report anything + LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); + } else { + // we have some reporters so + for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) { + String namedReporter = reporterConfiguration.f0; + Configuration reporterConfig = reporterConfiguration.f1; + + final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null); + if (className == null) { + LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported."); + continue; + } + + try { + String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); + TimeUnit timeunit = TimeUnit.SECONDS; + long period = 10; + + if (configuredPeriod != null) { + try { + String[] interval = configuredPeriod.split(" "); + period = Long.parseLong(interval[0]); + timeunit = TimeUnit.valueOf(interval[1]); + } + catch (Exception e) { + LOG.error("Cannot parse report interval from config: " + configuredPeriod + + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " + + "Using default reporting interval."); + } + } + + Class<?> reporterClass = Class.forName(className); + MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance(); + + MetricConfig metricConfig = new MetricConfig(); + reporterConfig.addAllToProperties(metricConfig); + LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig); + reporterInstance.open(metricConfig); + + if (reporterInstance instanceof Scheduled) { + LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className); + + executor.scheduleWithFixedDelay( + new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit); + } else { + LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className); + } + reporters.add(reporterInstance); + + String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter)); + if (delimiterForReporter.length() != 1) { + LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter); + delimiterForReporter = String.valueOf(globalDelimiter); + } + this.delimiters.add(delimiterForReporter.charAt(0)); + } + catch (Throwable t) { + LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t); + } + } + } + } + + /** + * Initializes the MetricQueryService. + * + * @param actorSystem ActorSystem to create the MetricQueryService on + * @param resourceID resource ID used to disambiguate the actor name + */ + public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) { + synchronized (lock) { + Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down."); + + try { + queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID); + metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService); + } catch (Exception e) { + LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e); + } + } + } + + /** + * Returns the address under which the {@link MetricQueryService} is reachable. + * + * @return address of the metric query service + */ + @Nullable + public String getMetricQueryServicePath() { + return metricQueryServicePath; + } + + @Override + public char getDelimiter() { + return this.globalDelimiter; + } + + @Override + public char getDelimiter(int reporterIndex) { + try { + return delimiters.get(reporterIndex); + } catch (IndexOutOfBoundsException e) { + LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", reporterIndex); + return this.globalDelimiter; + } + } + + @Override + public int getNumberReporters() { + return reporters.size(); + } + + public List<MetricReporter> getReporters() { + return reporters; + } + + /** + * Returns whether this registry has been shutdown. + * + * @return true, if this registry was shutdown, otherwise false + */ + public boolean isShutdown() { + synchronized (lock) { + return reporters == null && executor.isShutdown(); + } + } + + /** + * Shuts down this registry and the associated {@link MetricReporter}. + */ + public void shutdown() { + synchronized (lock) { + Future<Boolean> stopFuture = null; + FiniteDuration stopTimeout = null; + + if (queryService != null) { + stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); + stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + } + + if (reporters != null) { + for (MetricReporter reporter : reporters) { + try { + reporter.close(); + } catch (Throwable t) { + LOG.warn("Metrics reporter did not shut down cleanly", t); + } + } + reporters = null; + } + shutdownExecutor(); + + if (stopFuture != null) { + boolean stopped = false; + + try { + stopped = Await.result(stopFuture, stopTimeout); + } catch (Exception e) { + LOG.warn("Query actor did not properly stop.", e); + } + + if (!stopped) { + // the query actor did not stop in time, let's kill him + queryService.tell(Kill.getInstance(), ActorRef.noSender()); + } + } + } + } + + private void shutdownExecutor() { + if (executor != null) { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + } + } + } + + @Override + public ScopeFormats getScopeFormats() { + return scopeFormats; + } + + // ------------------------------------------------------------------------ + // Metrics (de)registration + // ------------------------------------------------------------------------ + + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + synchronized (lock) { + if (isShutdown()) { + LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down."); + } else { + if (reporters != null) { + for (int i = 0; i < reporters.size(); i++) { + MetricReporter reporter = reporters.get(i); + try { + if (reporter != null) { + FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); + reporter.notifyOfAddedMetric(metric, metricName, front); + } + } catch (Exception e) { + LOG.warn("Error while registering metric.", e); + } + } + } + try { + if (queryService != null) { + MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group); + } + } catch (Exception e) { + LOG.warn("Error while registering metric.", e); + } + try { + if (metric instanceof View) { + if (viewUpdater == null) { + viewUpdater = new ViewUpdater(executor); + } + viewUpdater.notifyOfAddedView((View) metric); + } + } catch (Exception e) { + LOG.warn("Error while registering metric.", e); + } + } + } + } + + @Override + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + synchronized (lock) { + if (isShutdown()) { + LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down."); + } else { + if (reporters != null) { + for (int i = 0; i < reporters.size(); i++) { + try { + MetricReporter reporter = reporters.get(i); + if (reporter != null) { + FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); + reporter.notifyOfRemovedMetric(metric, metricName, front); + } + } catch (Exception e) { + LOG.warn("Error while registering metric.", e); + } + } + } + try { + if (queryService != null) { + MetricQueryService.notifyOfRemovedMetric(queryService, metric); + } + } catch (Exception e) { + LOG.warn("Error while registering metric.", e); + } + try { + if (metric instanceof View) { + if (viewUpdater != null) { + viewUpdater.notifyOfRemovedView((View) metric); + } + } + } catch (Exception e) { + LOG.warn("Error while registering metric.", e); + } + } + } + } + + // ------------------------------------------------------------------------ + + @VisibleForTesting + @Nullable + public ActorRef getQueryService() { + return queryService; + } + + // ------------------------------------------------------------------------ + + /** + * This task is explicitly a static class, so that it does not hold any references to the enclosing + * MetricsRegistry instance. + * + * <p>This is a subtle difference, but very important: With this static class, the enclosing class instance + * may become garbage-collectible, whereas with an anonymous inner class, the timer thread + * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer. + * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible, + * which acts as a fail-safe to stop the timer thread and prevents resource leaks. + */ + private static final class ReporterTask extends TimerTask { + + private final Scheduled reporter; + + private ReporterTask(Scheduled reporter) { + this.reporter = reporter; + } + + @Override + public void run() { + try { + reporter.report(); + } catch (Throwable t) { + LOG.warn("Error while reporting metrics", t); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index ab59977..66eace5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -106,7 +106,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl this.registry = checkNotNull(registry); this.scopeComponents = checkNotNull(scope); this.parent = parent; - this.scopeStrings = new String[registry.getReporters() == null ? 0 : registry.getReporters().size()]; + this.scopeStrings = new String[registry.getNumberReporters()]; } public Map<String, String> getAllVariables() { http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index dd352bb..d4248ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderelection.LeaderAddressAndId; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; @@ -68,7 +68,7 @@ public class MiniCluster { private final MiniClusterConfiguration miniClusterConfiguration; @GuardedBy("lock") - private MetricRegistry metricRegistry; + private MetricRegistryImpl metricRegistry; @GuardedBy("lock") private RpcService commonRpcService; @@ -464,8 +464,8 @@ public class MiniCluster { * * @param config The configuration of the mini cluster */ - protected MetricRegistry createMetricRegistry(Configuration config) { - return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + protected MetricRegistryImpl createMetricRegistry(Configuration config) { + return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); } /** @@ -502,7 +502,7 @@ public class MiniCluster { Configuration configuration, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, int numResourceManagers, RpcService[] resourceManagerRpcServices) throws Exception { @@ -528,7 +528,7 @@ public class MiniCluster { protected TaskExecutor[] startTaskManagers( Configuration configuration, HighAvailabilityServices haServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, int numTaskManagers, RpcService[] taskManagerRpcServices) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index 60d9a66..ca042b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; @@ -75,7 +75,7 @@ public class MiniClusterJobDispatcher { private final JobManagerServices jobManagerServices; /** Registry for all metrics in the mini cluster */ - private final MetricRegistry metricRegistry; + private final MetricRegistryImpl metricRegistry; /** The number of JobManagers to launch (more than one simulates a high-availability setup) */ private final int numJobManagers; @@ -104,7 +104,7 @@ public class MiniClusterJobDispatcher { HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry) throws Exception { + MetricRegistryImpl metricRegistry) throws Exception { this( config, haServices, @@ -132,7 +132,7 @@ public class MiniClusterJobDispatcher { HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, int numJobManagers, RpcService[] rpcServices) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java index cbefe5a..90fb115 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java @@ -18,9 +18,6 @@ package org.apache.flink.runtime.minicluster; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -31,18 +28,25 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + /** * Mini cluster to run the old JobManager code without embedded high availability services. This * class has been implemented because the normal {@link FlinkMiniCluster} has been changed to use @@ -63,6 +67,8 @@ public class StandaloneMiniCluster { private final HighAvailabilityServices highAvailabilityServices; + private final MetricRegistryImpl metricRegistry; + private final FiniteDuration timeout; private final int port; @@ -86,21 +92,28 @@ public class StandaloneMiniCluster { Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); + metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(configuration)); + JobManager.startJobManagerActors( configuration, actorSystem, scheduledExecutorService, scheduledExecutorService, highAvailabilityServices, + metricRegistry, Option.empty(), JobManager.class, MemoryArchivist.class); + final ResourceID taskManagerResourceId = ResourceID.generate(); + ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor( configuration, - ResourceID.generate(), + taskManagerResourceId, actorSystem, highAvailabilityServices, + metricRegistry, LOCAL_HOSTNAME, Option.<String>empty(), true, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 1d4d4f3..98b80c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; @@ -118,7 +118,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; /** Registry to use for metrics. */ - private final MetricRegistry metricRegistry; + private final MetricRegistryImpl metricRegistry; /** Fatal error handler. */ private final FatalErrorHandler fatalErrorHandler; @@ -140,7 +140,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { @@ -498,8 +498,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> } @Override - public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { - final ArrayList<Tuple2<InstanceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size()); + public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { + final ArrayList<Tuple2<ResourceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size()); for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) { final ResourceID tmResourceId = workerRegistrationEntry.getKey(); @@ -508,7 +508,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString(); - metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), tmMetricQueryServicePath)); + metricQueryServicePaths.add(Tuple2.of(tmResourceId, tmMetricQueryServicePath)); } return CompletableFuture.completedFuture(metricQueryServicePaths); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 9eacb4b..cc2766b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -176,5 +176,5 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager * @param timeout for the asynchronous operation * @return Future containing the collection of instance ids and the corresponding metric query service path */ - CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index caa3ba0..361bdd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkException; @@ -55,7 +55,7 @@ public class ResourceManagerRunner implements FatalErrorHandler { final RpcService rpcService, final HighAvailabilityServices highAvailabilityServices, final HeartbeatServices heartbeatServices, - final MetricRegistry metricRegistry) throws Exception { + final MetricRegistryImpl metricRegistry) throws Exception { Preconditions.checkNotNull(resourceId); Preconditions.checkNotNull(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 624f31d..d2b1205 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -45,7 +45,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java index cf5bfcb..4d6ccd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java @@ -31,8 +31,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.RedirectHandler; import org.apache.flink.runtime.rest.handler.WebHandler; @@ -166,13 +166,13 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im //fetch TaskManager logs if no other process is currently doing it if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) { try { - InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID)); - CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); + ResourceID resourceId = new ResourceID(new String(StringUtils.hexStringToByte(taskManagerID))); + CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout); CompletableFuture<TransientBlobKey> blobKeyFuture = taskManagerFuture.thenCompose( (Optional<Instance> optTMInstance) -> { Instance taskManagerInstance = optTMInstance.orElseThrow( - () -> new CompletionException(new FlinkException("Could not find instance with " + instanceID + '.'))); + () -> new CompletionException(new FlinkException("Could not find instance with " + resourceId + '.'))); switch (fileMode) { case LOG: return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java index ad2ee1b..84c6e41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java @@ -19,19 +19,20 @@ package org.apache.flink.runtime.rest.handler.legacy; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.StringUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import java.io.IOException; import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -74,8 +75,16 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { // return them in an array. This avoids unnecessary code complexity. // If only one task manager is requested, we only fetch one task manager metrics. if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { - InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY))); - CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); + final String unescapedString; + + try { + unescapedString = URLDecoder.decode(pathParams.get(TASK_MANAGER_ID_KEY), "UTF-8"); + } catch (UnsupportedEncodingException e) { + return FutureUtils.completedExceptionally(new FlinkException("Could not decode task manager id: " + pathParams.get(TASK_MANAGER_ID_KEY) + '.', e)); + } + + ResourceID resourceId = new ResourceID(unescapedString); + CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, timeout); return tmInstanceFuture.thenApplyAsync( (Optional<Instance> optTaskManager) -> { @@ -116,7 +125,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { for (Instance instance : instances) { gen.writeStartObject(); - gen.writeStringField("id", instance.getId().toString()); + gen.writeStringField("id", instance.getTaskManagerID().getResourceIdString()); gen.writeStringField("path", instance.getTaskManagerGateway().getAddress()); gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort()); gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat()); @@ -131,7 +140,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { fetcher.update(); - MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); + MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getTaskManagerID().getResourceIdString()); if (metrics != null) { gen.writeObjectFieldStart("metrics"); long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java index 1bfb9f2..e71a1d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; @@ -145,20 +145,20 @@ public class MetricFetcher<T extends RestfulGateway> { // TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery // TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that // TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time - CompletableFuture<Collection<Tuple2<InstanceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway + CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway .requestTaskManagerMetricQueryServicePaths(timeout); taskManagerQueryServicePathsFuture.whenCompleteAsync( - (Collection<Tuple2<InstanceID, String>> queryServicePaths, Throwable throwable) -> { + (Collection<Tuple2<ResourceID, String>> queryServicePaths, Throwable throwable) -> { if (throwable != null) { LOG.warn("Requesting TaskManager's path for query services failed.", throwable); } else { List<String> taskManagersToRetain = queryServicePaths .stream() .map( - (Tuple2<InstanceID, String> tuple) -> { + (Tuple2<ResourceID, String> tuple) -> { retrieveAndQueryMetrics(tuple.f1); - return tuple.f0.toString(); + return tuple.f0.getResourceIdString(); } ).collect(Collectors.toList()); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index cd67705..a956111 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -55,7 +55,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.registration.RegistrationConnectionListener; @@ -135,7 +135,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final NetworkEnvironment networkEnvironment; /** The metric registry in the task manager */ - private final MetricRegistry metricRegistry; + private final MetricRegistryImpl metricRegistry; /** The heartbeat manager for job manager in the task manager */ private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; @@ -179,7 +179,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { NetworkEnvironment networkEnvironment, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 782ab07..5a69bb1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -29,22 +29,19 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; -import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; - import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -86,7 +83,7 @@ public class TaskManagerRunner implements FatalErrorHandler { private final HighAvailabilityServices highAvailabilityServices; - private final MetricRegistry metricRegistry; + private final MetricRegistryImpl metricRegistry; /** Executor used to run future callbacks */ private final ExecutorService executor; @@ -112,7 +109,7 @@ public class TaskManagerRunner implements FatalErrorHandler { HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration); - metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)); + metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)); final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); metricRegistry.startQueryService(actorSystem, resourceId); @@ -250,7 +247,7 @@ public class TaskManagerRunner implements FatalErrorHandler { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, boolean localCommunicationOnly, FatalErrorHandler fatalErrorHandler) throws Exception { @@ -269,18 +266,11 @@ public class TaskManagerRunner implements FatalErrorHandler { TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, - resourceID); + resourceID, + metricRegistry); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); - TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup( - metricRegistry, - taskManagerServices.getTaskManagerLocation().getHostname(), - resourceID.toString()); - - // Initialize the TM metrics - TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); - return new TaskExecutor( rpcService, taskManagerConfiguration, @@ -291,7 +281,7 @@ public class TaskManagerRunner implements FatalErrorHandler { highAvailabilityServices, heartbeatServices, metricRegistry, - taskManagerMetricGroup, + taskManagerServices.getTaskManagerMetricGroup(), taskManagerServices.getBroadcastVariableManager(), taskManagerServices.getFileCache(), taskManagerServices.getTaskSlotTable(), http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 2baf644..85e62c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateRegistry; @@ -62,7 +63,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; /** * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager}, - * {@link NetworkEnvironment} and the {@link MetricRegistry}. + * {@link NetworkEnvironment} and the {@link MetricRegistryImpl}. */ public class TaskManagerServices { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class); @@ -72,7 +73,6 @@ public class TaskManagerServices { private final MemoryManager memoryManager; private final IOManager ioManager; private final NetworkEnvironment networkEnvironment; - private final MetricRegistry metricRegistry; private final TaskManagerMetricGroup taskManagerMetricGroup; private final BroadcastVariableManager broadcastVariableManager; private final FileCache fileCache; @@ -85,7 +85,6 @@ public class TaskManagerServices { MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, - MetricRegistry metricRegistry, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, @@ -97,7 +96,6 @@ public class TaskManagerServices { this.memoryManager = Preconditions.checkNotNull(memoryManager); this.ioManager = Preconditions.checkNotNull(ioManager); this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); - this.metricRegistry = Preconditions.checkNotNull(metricRegistry); this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup); this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager); this.fileCache = Preconditions.checkNotNull(fileCache); @@ -126,10 +124,6 @@ public class TaskManagerServices { return taskManagerLocation; } - public MetricRegistry getMetricRegistry() { - return metricRegistry; - } - public TaskManagerMetricGroup getTaskManagerMetricGroup() { return taskManagerMetricGroup; } @@ -163,12 +157,14 @@ public class TaskManagerServices { * * @param resourceID resource ID of the task manager * @param taskManagerServicesConfiguration task manager configuration + * @param metricRegistry to register the TaskManagerMetricGroup * @return task manager components * @throws Exception */ public static TaskManagerServices fromConfiguration( TaskManagerServicesConfiguration taskManagerServicesConfiguration, - ResourceID resourceID) throws Exception { + ResourceID resourceID, + MetricRegistry metricRegistry) throws Exception { // pre-start checks checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); @@ -187,9 +183,6 @@ public class TaskManagerServices { // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); - final MetricRegistry metricRegistry = new MetricRegistry( - taskManagerServicesConfiguration.getMetricRegistryConfiguration()); - final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup( metricRegistry, taskManagerLocation.getHostname(), @@ -223,7 +216,6 @@ public class TaskManagerServices { memoryManager, ioManager, network, - metricRegistry, taskManagerMetricGroup, broadcastVariableManager, fileCache, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index bfd37bc..990fb22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.MathUtils; import org.apache.flink.util.NetUtils; @@ -72,8 +71,6 @@ public class TaskManagerServicesConfiguration { private final float memoryFraction; - private final MetricRegistryConfiguration metricRegistryConfiguration; - private final long timerServiceShutdownTimeout; public TaskManagerServicesConfiguration( @@ -85,7 +82,6 @@ public class TaskManagerServicesConfiguration { long configuredMemory, boolean preAllocateMemory, float memoryFraction, - MetricRegistryConfiguration metricRegistryConfiguration, long timerServiceShutdownTimeout) { this.taskManagerAddress = checkNotNull(taskManagerAddress); @@ -98,8 +94,6 @@ public class TaskManagerServicesConfiguration { this.preAllocateMemory = preAllocateMemory; this.memoryFraction = memoryFraction; - this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration); - checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " + "service shutdown timeout must be greater or equal to 0."); this.timerServiceShutdownTimeout = timerServiceShutdownTimeout; @@ -148,10 +142,6 @@ public class TaskManagerServicesConfiguration { return preAllocateMemory; } - public MetricRegistryConfiguration getMetricRegistryConfiguration() { - return metricRegistryConfiguration; - } - public long getTimerServiceShutdownTimeout() { return timerServiceShutdownTimeout; } @@ -211,8 +201,6 @@ public class TaskManagerServicesConfiguration { TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); - final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration); - long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis(); return new TaskManagerServicesConfiguration( @@ -224,7 +212,6 @@ public class TaskManagerServicesConfiguration { configuredMemory, preAllocateMemory, memoryFraction, - metricRegistryConfiguration, timerServiceShutdownTimeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index d871b06..331e96b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -95,5 +95,5 @@ public interface RestfulGateway extends RpcGateway { * @param timeout for the asynchronous operation * @return Future containing the collection of instance ids and the corresponding metric query service path */ - CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala index 74ef1de..1c573c0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala @@ -35,7 +35,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.messages.Acknowledge import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus} -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup +import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry} import scala.concurrent.duration._ import scala.language.postfixOps @@ -74,7 +75,7 @@ abstract class ContaineredJobManager( submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[FlinkMetricRegistry], + jobManagerMetricGroup: JobManagerMetricGroup, optRestAddress: Option[String]) extends JobManager( flinkConfiguration, @@ -91,7 +92,7 @@ abstract class ContaineredJobManager( submittedJobGraphs, checkpointRecoveryFactory, jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, optRestAddress) { val jobPollingInterval: FiniteDuration http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0435046..d40a0fd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -34,7 +34,6 @@ import org.apache.flink.api.common.time.Time import org.apache.flink.configuration._ import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.core.io.InputSplitAssigner -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup import org.apache.flink.metrics.{Gauge, MetricGroup} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} @@ -50,10 +49,10 @@ import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServic import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ -import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution +import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager} import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener @@ -66,20 +65,18 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.RegistrationMessages._ -import org.apache.flink.runtime.messages.{Acknowledge, StackTrace} import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} +import org.apache.flink.runtime.messages.{Acknowledge, StackTrace} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup -import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.metrics.util.MetricUtils -import org.apache.flink.runtime.net.SSLUtils +import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered} import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation} -import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils} import org.apache.flink.runtime.taskexecutor.TaskExecutor import org.apache.flink.runtime.taskmanager.TaskManager @@ -137,7 +134,7 @@ class JobManager( protected val submittedJobGraphs : SubmittedJobGraphStore, protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, protected val jobRecoveryTimeout: FiniteDuration, - protected val metricsRegistry: Option[FlinkMetricRegistry], + protected val jobManagerMetricGroup: JobManagerMetricGroup, protected val optRestAddress: Option[String]) extends FlinkActor with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging @@ -154,16 +151,6 @@ class JobManager( var leaderSessionID: Option[UUID] = None - protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match { - case Some(registry) => - val host = flinkConfiguration.getString(JobManagerOptions.ADDRESS) - Option(new JobManagerMetricGroup( - registry, NetUtils.unresolvedHostToNormalizedString(host))) - case None => - log.warn("Could not instantiate JobManager metrics.") - None - } - /** Futures which have to be completed before terminating the job manager */ var futuresToComplete: Option[Seq[Future[Unit]]] = None @@ -205,12 +192,7 @@ class JobManager( throw new RuntimeException("Could not start the submitted job graphs service.", e) } - jobManagerMetricGroup match { - case Some(group) => - instantiateMetrics(group) - case None => - log.warn("Could not instantiate JobManager metric group.") - } + instantiateMetrics(jobManagerMetricGroup) } override def postStop(): Unit = { @@ -250,6 +232,8 @@ class JobManager( archive ! decorateMessage(PoisonPill) } + jobManagerMetricGroup.close() + instanceManager.shutdown() scheduler.shutdown() libraryCacheManager.shutdown() @@ -260,13 +244,6 @@ class JobManager( case e: IOException => log.error("Could not properly shutdown the blob server.", e) } - // failsafe shutdown of the metrics registry - try { - metricsRegistry.foreach(_.shutdown()) - } catch { - case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) - } - log.debug(s"Job manager ${self.path} is completely stopped.") } @@ -1073,9 +1050,9 @@ class JobManager( ) ) - case RequestTaskManagerInstance(instanceID) => + case RequestTaskManagerInstance(resourceId) => sender ! decorateMessage( - TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID))) + TaskManagerInstance(Option(instanceManager.getRegisteredInstance(resourceId))) ) case Heartbeat(instanceID, accumulators) => @@ -1283,15 +1260,7 @@ class JobManager( log.info(s"Using restart strategy $restartStrategy for $jobId.") - val jobMetrics = jobManagerMetricGroup match { - case Some(group) => - group.addJob(jobGraph) match { - case (jobGroup:Any) => jobGroup - case null => new UnregisteredMetricsGroup() - } - case None => - new UnregisteredMetricsGroup() - } + val jobMetrics = jobManagerMetricGroup.addJob(jobGraph) val numSlots = scheduler.getTotalNumberOfSlots() @@ -1791,7 +1760,7 @@ class JobManager( libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID) - jobManagerMetricGroup.foreach(_.removeJob(jobID)) + jobManagerMetricGroup.removeJob(jobID) futureOption } @@ -2042,7 +2011,12 @@ object JobManager { val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, ioExecutor, - AddressResolution.NO_ADDRESS_RESOLUTION); + AddressResolution.NO_ADDRESS_RESOLUTION) + + val metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(configuration)) + + metricRegistry.startQueryService(jobManagerSystem, null) val (_, _, webMonitorOption, _) = try { startJobManagerActors( @@ -2053,6 +2027,7 @@ object JobManager { futureExecutor, ioExecutor, highAvailabilityServices, + metricRegistry, classOf[JobManager], classOf[MemoryArchivist], Option(classOf[StandaloneResourceManager]) @@ -2085,6 +2060,13 @@ object JobManager { LOG.warn("Could not properly stop the high availability services.", t) } + try { + metricRegistry.shutdown() + } catch { + case t: Throwable => + LOG.warn("Could not properly shut down the metric registry.", t) + } + FlinkExecutors.gracefulShutdown( timeout.toMillis, TimeUnit.MILLISECONDS, @@ -2191,6 +2173,7 @@ object JobManager { futureExecutor: ScheduledExecutorService, ioExecutor: Executor, highAvailabilityServices: HighAvailabilityServices, + metricRegistry: FlinkMetricRegistry, jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist], resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]]) @@ -2231,6 +2214,7 @@ object JobManager { futureExecutor, ioExecutor, highAvailabilityServices, + metricRegistry, webMonitor.map(_.getRestAddress), jobManagerClass, archiveClass) @@ -2250,11 +2234,14 @@ object JobManager { if (executionMode == JobManagerMode.LOCAL) { LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode") + val resourceId = ResourceID.generate() + val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor( configuration, - ResourceID.generate(), + resourceId, jobManagerSystem, highAvailabilityServices, + metricRegistry, externalHostname, Some(TaskExecutor.TASK_MANAGER_NAME), localTaskManagerCommunication = true, @@ -2433,7 +2420,8 @@ object JobManager { configuration: Configuration, futureExecutor: ScheduledExecutorService, ioExecutor: Executor, - blobStore: BlobStore) : + blobStore: BlobStore, + metricRegistry: FlinkMetricRegistry) : (InstanceManager, FlinkScheduler, BlobServer, @@ -2443,7 +2431,7 @@ object JobManager { Int, // number of archived jobs Option[Path], // archive path FiniteDuration, // timeout for job recovery - Option[FlinkMetricRegistry] + JobManagerMetricGroup ) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -2525,12 +2513,9 @@ object JobManager { } } - val metricRegistry = try { - Option(new FlinkMetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration))) - } catch { - case _: Exception => - None - } + val jobManagerMetricGroup = new JobManagerMetricGroup( + metricRegistry, + configuration.getString(JobManagerOptions.ADDRESS)) (instanceManager, scheduler, @@ -2541,7 +2526,7 @@ object JobManager { archiveCount, archivePath, jobRecoveryTimeout, - metricRegistry) + jobManagerMetricGroup) } /** @@ -2564,6 +2549,7 @@ object JobManager { futureExecutor: ScheduledExecutorService, ioExecutor: Executor, highAvailabilityServices: HighAvailabilityServices, + metricRegistry: FlinkMetricRegistry, optRestAddress: Option[String], jobManagerClass: Class[_ <: JobManager], archiveClass: Class[_ <: MemoryArchivist]) @@ -2575,6 +2561,7 @@ object JobManager { futureExecutor, ioExecutor, highAvailabilityServices, + metricRegistry, optRestAddress, Some(JobMaster.JOB_MANAGER_NAME), Some(JobMaster.ARCHIVE_NAME), @@ -2606,6 +2593,7 @@ object JobManager { futureExecutor: ScheduledExecutorService, ioExecutor: Executor, highAvailabilityServices: HighAvailabilityServices, + metricRegistry: FlinkMetricRegistry, optRestAddress: Option[String], jobManagerActorName: Option[String], archiveActorName: Option[String], @@ -2622,11 +2610,12 @@ object JobManager { archiveCount, archivePath, jobRecoveryTimeout, - metricsRegistry) = createJobManagerComponents( + jobManagerMetricGroup) = createJobManagerComponents( configuration, futureExecutor, ioExecutor, - highAvailabilityServices.createBlobStore()) + highAvailabilityServices.createBlobStore(), + metricRegistry) val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath) @@ -2653,7 +2642,7 @@ object JobManager { highAvailabilityServices.getSubmittedJobGraphStore(), highAvailabilityServices.getCheckpointRecoveryFactory(), jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, optRestAddress) val jobManager: ActorRef = jobManagerActorName match { @@ -2661,12 +2650,6 @@ object JobManager { case None => actorSystem.actorOf(jobManagerProps) } - metricsRegistry match { - case Some(registry) => - registry.startQueryService(actorSystem, null) - case None => - } - (jobManager, archive) } @@ -2693,7 +2676,7 @@ object JobManager { submittedJobGraphStore: SubmittedJobGraphStore, checkpointRecoveryFactory: CheckpointRecoveryFactory, jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[FlinkMetricRegistry], + jobManagerMetricGroup: JobManagerMetricGroup, optRestAddress: Option[String]): Props = { Props( @@ -2712,7 +2695,7 @@ object JobManager { submittedJobGraphStore, checkpointRecoveryFactory, jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, optRestAddress) } } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 3e896ca..5c19c7a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -24,8 +24,9 @@ import java.util.UUID import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.blob.{PermanentBlobKey} +import org.apache.flink.runtime.blob.PermanentBlobKey import org.apache.flink.runtime.client.{JobStatusMessage, SerializedJobExecutionResult} +import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph} import org.apache.flink.runtime.instance.{Instance, InstanceID} import org.apache.flink.runtime.io.network.partition.ResultPartitionID @@ -419,9 +420,9 @@ object JobManagerMessages { /** * Requests the [[Instance]] object of the task manager with the given instance ID * - * @param instanceID Instance ID of the task manager + * @param resourceId identifying the TaskManager which shall be retrieved */ - case class RequestTaskManagerInstance(instanceID: InstanceID) + case class RequestTaskManagerInstance(resourceId: ResourceID) /** * Returns the [[Instance]] object of the requested task manager. This is in response to http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index c152f4a..689d98f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.HighAvailabilityMode import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager +import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl} import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware} import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} @@ -121,6 +122,9 @@ abstract class FlinkMiniCluster( Hardware.getNumberCPUCores(), new ExecutorThreadFactory("mini-cluster-io")) + protected val metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(originalConfiguration)) + def this(configuration: Configuration, useSingleActorSystem: Boolean) { this( configuration, @@ -325,6 +329,10 @@ abstract class FlinkMiniCluster( lazy val singleActorSystem = startJobManagerActorSystem(0) + if (originalConfiguration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { + metricRegistry.startQueryService(singleActorSystem, null) + } + val (jmActorSystems, jmActors) = (for(i <- 0 until numJobManagers) yield { val actorSystem = if(useSingleActorSystem) { http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index e22230e..e9bdb2a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -19,6 +19,7 @@ package org.apache.flink.runtime.minicluster import java.net.InetAddress +import java.util.UUID import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.{ActorRef, ActorSystem, Props} @@ -46,7 +47,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup} +import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl} import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.util.EnvironmentInformation @@ -83,6 +85,12 @@ class LocalFlinkMiniCluster( def this(userConfiguration: Configuration) = this(userConfiguration, true) + override def startInternalShutdown() { + metricRegistry.shutdown() + + super.startInternalShutdown() + } + // -------------------------------------------------------------------------- override def generateConfiguration(userConfiguration: Configuration): Configuration = { @@ -137,23 +145,20 @@ class LocalFlinkMiniCluster( } val (instanceManager, - scheduler, - blobServer, - libraryCacheManager, - restartStrategyFactory, - timeout, - archiveCount, - archivePath, - jobRecoveryTimeout, - metricsRegistry) = JobManager.createJobManagerComponents( + scheduler, + blobServer, + libraryCacheManager, + restartStrategyFactory, + timeout, + archiveCount, + archivePath, + jobRecoveryTimeout, + jobManagerMetricGroup) = JobManager.createJobManagerComponents( config, futureExecutor, ioExecutor, - highAvailabilityServices.createBlobStore()) - - if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { - metricsRegistry.get.startQueryService(system, null) - } + highAvailabilityServices.createBlobStore(), + metricRegistry) val archive = system.actorOf( getArchiveProps( @@ -180,7 +185,7 @@ class LocalFlinkMiniCluster( highAvailabilityServices.getSubmittedJobGraphStore(), highAvailabilityServices.getCheckpointRecoveryFactory(), jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, optRestAddress), jobManagerName) } @@ -248,9 +253,8 @@ class LocalFlinkMiniCluster( val taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, - resourceID) - - val metricRegistry = taskManagerServices.getMetricRegistry() + resourceID, + metricRegistry) val props = getTaskManagerProps( taskManagerClass, @@ -260,7 +264,7 @@ class LocalFlinkMiniCluster( taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment, - metricRegistry) + taskManagerServices.getTaskManagerMetricGroup) if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { metricRegistry.startQueryService(system, resourceID) @@ -296,7 +300,7 @@ class LocalFlinkMiniCluster( submittedJobGraphStore: SubmittedJobGraphStore, checkpointRecoveryFactory: CheckpointRecoveryFactory, jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[MetricRegistry], + jobManagerMetricGroup: JobManagerMetricGroup, optRestAddress: Option[String]) : Props = { @@ -316,7 +320,7 @@ class LocalFlinkMiniCluster( submittedJobGraphStore, checkpointRecoveryFactory, jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, optRestAddress) } @@ -328,7 +332,7 @@ class LocalFlinkMiniCluster( memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, - metricsRegistry: MetricRegistry): Props = { + taskManagerMetricGroup: TaskManagerMetricGroup): Props = { TaskManager.getTaskManagerProps( taskManagerClass, @@ -339,7 +343,7 @@ class LocalFlinkMiniCluster( ioManager, networkEnvironment, highAvailabilityServices, - metricsRegistry) + taskManagerMetricGroup) } def getResourceManagerProps( http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index cc01a8d..f209dac 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -22,7 +22,7 @@ import java.io.{File, FileInputStream, IOException} import java.lang.management.ManagementFactory import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket} import java.util -import java.util.concurrent.{Callable, TimeUnit} +import java.util.concurrent.{Callable, TimeUnit, TimeoutException} import java.util.{Collections, UUID} import _root_.akka.actor._ @@ -63,8 +63,7 @@ import org.apache.flink.runtime.messages.TaskMessages._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} import org.apache.flink.runtime.messages.{Acknowledge, StackTraceSampleResponse} import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup -import org.apache.flink.runtime.metrics.util.MetricUtils -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils} import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} @@ -127,7 +126,7 @@ class TaskManager( protected val network: NetworkEnvironment, protected val numberOfSlots: Int, protected val highAvailabilityServices: HighAvailabilityServices, - protected val metricsRegistry: FlinkMetricRegistry) + protected val taskManagerMetricGroup: TaskManagerMetricGroup) extends FlinkActor with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging with LogMessages // Mixin order is important: first we want to support message logging @@ -154,8 +153,6 @@ class TaskManager( getJobManagerLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID) - private var taskManagerMetricGroup : TaskManagerMetricGroup = _ - /** Actors which want to be notified once this task manager has been * registered at the job manager */ private val waitForRegistration = scala.collection.mutable.Set[ActorRef]() @@ -258,13 +255,8 @@ class TaskManager( } catch { case t: Exception => log.error("FileCache did not shutdown properly.", t) } - - // failsafe shutdown of the metrics registry - try { - metricsRegistry.shutdown() - } catch { - case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) - } + + taskManagerMetricGroup.close() log.info(s"Task manager ${self.path} is completely shut down.") } @@ -980,12 +972,6 @@ class TaskManager( throw new RuntimeException(message, e) } - taskManagerMetricGroup = - new TaskManagerMetricGroup(metricsRegistry, location.getHostname, id.toString) - - MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup) - MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network) - // watch job manager to detect when it dies context.watch(jobManager) @@ -1832,15 +1818,22 @@ object TaskManager { actorSystemPort, LOG.logger) + val metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(configuration)) + + metricRegistry.startQueryService(taskManagerSystem, resourceID) + // start all the TaskManager services (network stack, library cache, ...) // and the TaskManager actor try { + LOG.info("Starting TaskManager actor") val taskManager = startTaskManagerComponentsAndActor( configuration, resourceID, taskManagerSystem, highAvailabilityServices, + metricRegistry, taskManagerHostname, Some(TaskExecutor.TASK_MANAGER_NAME), localTaskManagerCommunication = false, @@ -1893,6 +1886,9 @@ object TaskManager { } throw t } + + // shut down the metric query service + metricRegistry.shutdown() } /** @@ -1984,6 +1980,7 @@ object TaskManager { resourceID: ResourceID, actorSystem: ActorSystem, highAvailabilityServices: HighAvailabilityServices, + metricRegistry: FlinkMetricRegistry, taskManagerHostname: String, taskManagerActorName: Option[String], localTaskManagerCommunication: Boolean, @@ -1999,9 +1996,8 @@ object TaskManager { val taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, - resourceID) - - val metricRegistry = taskManagerServices.getMetricRegistry() + resourceID, + metricRegistry) // create the actor properties (which define the actor constructor parameters) val tmProps = getTaskManagerProps( @@ -2013,9 +2009,7 @@ object TaskManager { taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), highAvailabilityServices, - metricRegistry) - - metricRegistry.startQueryService(actorSystem, resourceID) + taskManagerServices.getTaskManagerMetricGroup) taskManagerActorName match { case Some(actorName) => actorSystem.actorOf(tmProps, actorName) @@ -2032,7 +2026,7 @@ object TaskManager { ioManager: IOManager, networkEnvironment: NetworkEnvironment, highAvailabilityServices: HighAvailabilityServices, - metricsRegistry: FlinkMetricRegistry + taskManagerMetricGroup: TaskManagerMetricGroup ): Props = { Props( taskManagerClass, @@ -2044,7 +2038,7 @@ object TaskManager { networkEnvironment, taskManagerConfig.getNumberSlots(), highAvailabilityServices, - metricsRegistry) + taskManagerMetricGroup) } // --------------------------------------------------------------------------
