[FLINK-8670] Make MetricRegistryImpl#shutdown non blocking This commit makes the MetricRegistryImpl#shutdown method non blocking. Instead of waiting for the completion of the shutdown procedure, the method returns a future which is completed once the metric registry has completed the shut down.
This closes #5504. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e29ec0fb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e29ec0fb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e29ec0fb Branch: refs/heads/master Commit: e29ec0fbd2cb03a42b98142f63ce73b97dc2e915 Parents: d9b28e8 Author: Till Rohrmann <[email protected]> Authored: Fri Feb 16 08:12:04 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 23 18:22:09 2018 +0100 ---------------------------------------------------------------------- .../org/apache/flink/util/ExecutorUtils.java | 21 ++++ .../MesosApplicationMasterRunner.java | 2 +- .../ScheduledDropwizardReporterTest.java | 4 +- .../DropwizardFlinkHistogramWrapperTest.java | 2 +- .../flink/metrics/jmx/JMXReporterTest.java | 8 +- .../PrometheusReporterTaskScopeTest.java | 4 +- .../prometheus/PrometheusReporterTest.java | 20 +-- .../flink/metrics/slf4j/Slf4jReporterTest.java | 4 +- .../metrics/statsd/StatsDReporterTest.java | 8 +- .../apache/flink/runtime/akka/ActorUtils.java | 89 ++++++++++++++ .../runtime/metrics/MetricRegistryImpl.java | 122 ++++++++++--------- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../runtime/minicluster/FlinkMiniCluster.scala | 2 +- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../runtime/metrics/MetricRegistryImplTest.java | 42 +++---- .../runtime/metrics/TaskManagerMetricsTest.java | 2 +- .../metrics/groups/AbstractMetricGroupTest.java | 10 +- .../metrics/groups/JobManagerGroupTest.java | 16 +-- .../metrics/groups/JobManagerJobGroupTest.java | 12 +- .../groups/MetricGroupRegistrationTest.java | 8 +- .../runtime/metrics/groups/MetricGroupTest.java | 4 +- .../metrics/groups/OperatorGroupTest.java | 37 +++--- .../metrics/groups/TaskManagerGroupTest.java | 30 +++-- .../metrics/groups/TaskManagerJobGroupTest.java | 28 +++-- .../metrics/groups/TaskMetricGroupTest.java | 34 ++++-- .../flink/yarn/YarnApplicationMasterRunner.java | 2 +- 26 files changed, 334 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java index d98bdd2..0a7f161 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -74,4 +75,24 @@ public class ExecutorUtils { } } } + + /** + * Shuts the given {@link ExecutorService} down in a non-blocking fashion. The shut down will + * be executed by a thread from the common fork-join pool. + * + * <p>The executor services will be shut down gracefully for the given timeout period. Afterwards + * {@link ExecutorService#shutdownNow()} will be called. + * + * @param timeout before {@link ExecutorService#shutdownNow()} is called + * @param unit time unit of the timeout + * @param executorServices to shut down + * @return Future which is completed once the {@link ExecutorService} are shut down + */ + public static CompletableFuture<Void> nonBlockingShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) { + return CompletableFuture.supplyAsync( + () -> { + gracefulShutdown(timeout, unit, executorServices); + return null; + }); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 94804ac..630fa83 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -433,7 +433,7 @@ public class MesosApplicationMasterRunner { if (metricRegistry != null) { try { - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } catch (Throwable t) { LOG.error("Could not shut down metric registry.", t); } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index 4a2ca3a..b69b8d8 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -74,7 +74,7 @@ public class ScheduledDropwizardReporterTest { * Tests that the registered metrics' names don't contain invalid characters. */ @Test - public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException { + public void testAddingMetrics() throws Exception { Configuration configuration = new Configuration(); String taskName = "test\"Ta\"..sk"; String jobName = "testJ\"ob:-!ax..?"; @@ -131,7 +131,7 @@ public class ScheduledDropwizardReporterTest { assertEquals(expectedCounterName, counters.get(myCounter)); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index fb21a75b..d23a22c 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -150,7 +150,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger { assertEquals(0, testingReporter.getMetrics().size()); } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 40b7f15..6e45646 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -145,7 +145,7 @@ public class JMXReporterTest extends TestLogger { rep1.notifyOfRemovedMetric(g2, "rep2", null); mg.close(); - reg.shutdown(); + reg.shutdown().get(); } /** @@ -219,7 +219,7 @@ public class JMXReporterTest extends TestLogger { rep1.close(); rep2.close(); mg.close(); - reg.shutdown(); + reg.shutdown().get(); } /** @@ -266,7 +266,7 @@ public class JMXReporterTest extends TestLogger { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } } @@ -306,7 +306,7 @@ public class JMXReporterTest extends TestLogger { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java index d4ad1f9..724a79b 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java @@ -90,9 +90,9 @@ public class PrometheusReporterTaskScopeTest { } @After - public void shutdownRegistry() { + public void shutdownRegistry() throws Exception { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 6833a06..e9fd985 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -84,9 +84,9 @@ public class PrometheusReporterTest extends TestLogger { } @After - public void shutdownRegistry() { + public void shutdownRegistry() throws Exception { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } @@ -237,7 +237,7 @@ public class PrometheusReporterTest extends TestLogger { } @Test - public void cannotStartTwoReportersOnSamePort() { + public void cannotStartTwoReportersOnSamePort() throws Exception { final MetricRegistryImpl fixedPort1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRangeProvider.next()))); assertThat(fixedPort1.getReporters(), hasSize(1)); @@ -246,12 +246,12 @@ public class PrometheusReporterTest extends TestLogger { final MetricRegistryImpl fixedPort2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort())))); assertThat(fixedPort2.getReporters(), hasSize(0)); - fixedPort1.shutdown(); - fixedPort2.shutdown(); + fixedPort1.shutdown().get(); + fixedPort2.shutdown().get(); } @Test - public void canStartTwoReportersWhenUsingPortRange() { + public void canStartTwoReportersWhenUsingPortRange() throws Exception { String portRange = portRangeProvider.next(); final MetricRegistryImpl portRange1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRange))); final MetricRegistryImpl portRange2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", portRange))); @@ -259,8 +259,8 @@ public class PrometheusReporterTest extends TestLogger { assertThat(portRange1.getReporters(), hasSize(1)); assertThat(portRange2.getReporters(), hasSize(1)); - portRange1.shutdown(); - portRange2.shutdown(); + portRange1.shutdown().get(); + portRange2.shutdown().get(); } private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException { @@ -280,8 +280,8 @@ public class PrometheusReporterTest extends TestLogger { } @After - public void closeReporterAndShutdownRegistry() { - registry.shutdown(); + public void closeReporterAndShutdownRegistry() throws Exception { + registry.shutdown().get(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java index b344f45..172c79c 100644 --- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java +++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java @@ -76,8 +76,8 @@ public class Slf4jReporterTest extends TestLogger { } @AfterClass - public static void tearDown() { - registry.shutdown(); + public static void tearDown() throws Exception { + registry.shutdown().get(); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 08d4998..c9f5af0 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -73,7 +73,7 @@ public class StatsDReporterTest extends TestLogger { * Tests that the registered metrics' names don't contain invalid characters. */ @Test - public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException { + public void testAddingMetrics() throws Exception { Configuration configuration = new Configuration(); String taskName = "testTask"; String jobName = "testJob:-!ax..?"; @@ -124,7 +124,7 @@ public class StatsDReporterTest extends TestLogger { assertEquals(expectedCounterName, counters.get(myCounter)); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** @@ -187,7 +187,7 @@ public class StatsDReporterTest extends TestLogger { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } if (receiver != null) { @@ -247,7 +247,7 @@ public class StatsDReporterTest extends TestLogger { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } if (receiver != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java new file mode 100644 index 0000000..f2f9059 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.akka; + +import org.apache.flink.runtime.concurrent.FutureUtils; + +import akka.actor.ActorRef; +import akka.actor.Kill; +import akka.pattern.Patterns; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * Utility functions for the interaction with Akka {@link akka.actor.Actor}. + */ +public class ActorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class); + + /** + * Shuts the given {@link akka.actor.Actor} down in a non blocking fashion. The method first tries to + * gracefully shut them down. If this is not successful, then the actors will be terminated by sending + * a {@link akka.actor.Kill} message. + * + * @param gracePeriod for the graceful shutdown + * @param timeUnit time unit of the grace period + * @param actors to shut down + * @return Future which is completed once all actors have been shut down gracefully or forceful + * kill messages have been sent to all actors. Occurring errors will be suppressed into one error. + */ + public static CompletableFuture<Void> nonBlockingShutDown(long gracePeriod, TimeUnit timeUnit, ActorRef... actors) { + final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(actors.length); + final FiniteDuration timeout = new FiniteDuration(gracePeriod, timeUnit); + + for (ActorRef actor : actors) { + try { + final Future<Boolean> booleanFuture = Patterns.gracefulStop(actor, timeout); + final CompletableFuture<Void> terminationFuture = FutureUtils.toJava(booleanFuture) + .<Void>thenApply(ignored -> null) + .exceptionally((Throwable throwable) -> { + if (throwable instanceof TimeoutException) { + // the actor did not gracefully stop within the grace period --> Let's kill him + actor.tell(Kill.getInstance(), ActorRef.noSender()); + return null; + } else { + throw new CompletionException(throwable); + } + }); + + terminationFutures.add(terminationFuture); + } catch (IllegalStateException ignored) { + // this can happen if the underlying actor system has been stopped before shutting + // the actor down + LOG.debug("The actor {} has already been stopped because the " + + "underlying ActorSystem has already been shut down.", actor.path()); + } + } + + return FutureUtils.completeAll(terminationFutures); + } + + private ActorUtils() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index c8f4490..6b37709 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -28,35 +29,36 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.View; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.Kill; -import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - /** * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. @@ -66,8 +68,14 @@ public class MetricRegistryImpl implements MetricRegistry { private final Object lock = new Object(); - private List<MetricReporter> reporters; - private ScheduledExecutorService executor; + private final List<MetricReporter> reporters; + private final ScheduledExecutorService executor; + + private final ScopeFormats scopeFormats; + private final char globalDelimiter; + private final List<Character> delimiters; + + private final CompletableFuture<Void> terminationFuture; @Nullable private ActorRef queryService; @@ -77,9 +85,7 @@ public class MetricRegistryImpl implements MetricRegistry { private ViewUpdater viewUpdater; - private final ScopeFormats scopeFormats; - private final char globalDelimiter; - private final List<Character> delimiters = new ArrayList<>(); + private boolean isShutdown; /** * Creates a new MetricRegistry and starts the configured reporter. @@ -87,9 +93,12 @@ public class MetricRegistryImpl implements MetricRegistry { public MetricRegistryImpl(MetricRegistryConfiguration config) { this.scopeFormats = config.getScopeFormats(); this.globalDelimiter = config.getDelimiter(); + this.delimiters = new ArrayList<>(10); + this.terminationFuture = new CompletableFuture<>(); + this.isShutdown = false; // second, instantiate any custom configured reporters - this.reporters = new ArrayList<>(); + this.reporters = new ArrayList<>(4); List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations(); @@ -226,71 +235,72 @@ public class MetricRegistryImpl implements MetricRegistry { */ public boolean isShutdown() { synchronized (lock) { - return reporters == null && executor.isShutdown(); + return isShutdown; } } /** * Shuts down this registry and the associated {@link MetricReporter}. + * + * <p>NOTE: This operation is asynchronous and returns a future which is completed + * once the shutdown operation has been completed. + * + * @return Future which is completed once the {@link MetricRegistryImpl} + * is shut down. */ - public void shutdown() { + public CompletableFuture<Void> shutdown() { synchronized (lock) { - Future<Boolean> stopFuture = null; - FiniteDuration stopTimeout = null; + if (isShutdown) { + return terminationFuture; + } else { + isShutdown = true; + final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); + final Time gracePeriod = Time.seconds(1L); - if (queryService != null) { - stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); + if (queryService != null) { + final CompletableFuture<Void> queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown( + gracePeriod.toMilliseconds(), + TimeUnit.MILLISECONDS, + queryService); - try { - stopFuture = Patterns.gracefulStop(queryService, stopTimeout); - } catch (IllegalStateException ignored) { - // this can happen if the underlying actor system has been stopped before shutting - // the metric registry down - // TODO: Pull the MetricQueryService actor out of the MetricRegistry - LOG.debug("The metric query service actor has already been stopped because the " + - "underlying ActorSystem has already been shut down."); + terminationFutures.add(queryServiceTerminationFuture); } - } - if (reporters != null) { + Throwable throwable = null; for (MetricReporter reporter : reporters) { try { reporter.close(); } catch (Throwable t) { - LOG.warn("Metrics reporter did not shut down cleanly", t); + throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } } - reporters = null; - } - shutdownExecutor(); - - if (stopFuture != null) { - boolean stopped = false; - - try { - stopped = Await.result(stopFuture, stopTimeout); - } catch (Exception e) { - LOG.warn("Query actor did not properly stop.", e); - } + reporters.clear(); - if (!stopped) { - // the query actor did not stop in time, let's kill him - queryService.tell(Kill.getInstance(), ActorRef.noSender()); + if (throwable != null) { + terminationFutures.add( + FutureUtils.completedExceptionally( + new FlinkException("Could not shut down the metric reporters properly.", throwable))); } - } - } - } - private void shutdownExecutor() { - if (executor != null) { - executor.shutdown(); + final CompletableFuture<Void> executorShutdownFuture = ExecutorUtils.nonBlockingShutdown( + gracePeriod.toMilliseconds(), + TimeUnit.MILLISECONDS, + executor); + + terminationFutures.add(executorShutdownFuture); + + FutureUtils + .completeAll(terminationFutures) + .whenComplete( + (Void ignored, Throwable error) -> { + if (error != null) { + terminationFuture.completeExceptionally(error); + } else { + terminationFuture.complete(null); + } + }); - try { - if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); + return terminationFuture; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d0e401c..1dfaa5d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2054,7 +2054,7 @@ object JobManager { } try { - metricRegistry.shutdown() + metricRegistry.shutdown().get() } catch { case t: Throwable => LOG.warn("Could not properly shut down the metric registry.", t) http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 7948ba1..6c9ee5b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -468,7 +468,7 @@ abstract class FlinkMiniCluster( Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout) - metricRegistryOpt.foreach(_.shutdown()) + metricRegistryOpt.foreach(_.shutdown().get()) if (!useSingleActorSystem) { taskManagerActorSystems foreach { http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 106bea1..485add5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1900,7 +1900,7 @@ object TaskManager { // shut down the metric query service try { - metricRegistry.shutdown() + metricRegistry.shutdown().get() } catch { case t: Throwable => LOG.error("Could not properly shut down the metric registry.", t) http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java index 2eccc0c..adb622d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java @@ -59,12 +59,12 @@ public class MetricRegistryImplTest extends TestLogger { private static final char GLOBAL_DEFAULT_DELIMITER = '.'; @Test - public void testIsShutdown() { + public void testIsShutdown() throws Exception { MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); Assert.assertFalse(metricRegistry.isShutdown()); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); Assert.assertTrue(metricRegistry.isShutdown()); } @@ -73,7 +73,7 @@ public class MetricRegistryImplTest extends TestLogger { * Verifies that the reporter name list is correctly used to determine which reporters should be instantiated. */ @Test - public void testReporterInclusion() { + public void testReporterInclusion() throws Exception { Configuration config = new Configuration(); config.setString(MetricOptions.REPORTERS_LIST, "test"); @@ -87,7 +87,7 @@ public class MetricRegistryImplTest extends TestLogger { Assert.assertTrue(TestReporter1.wasOpened); Assert.assertFalse(TestReporter11.wasOpened); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** @@ -106,7 +106,7 @@ public class MetricRegistryImplTest extends TestLogger { * Verifies that multiple reporters are instantiated correctly. */ @Test - public void testMultipleReporterInstantiation() { + public void testMultipleReporterInstantiation() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); @@ -121,7 +121,7 @@ public class MetricRegistryImplTest extends TestLogger { Assert.assertTrue(TestReporter12.wasOpened); Assert.assertTrue(TestReporter13.wasOpened); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** @@ -164,14 +164,14 @@ public class MetricRegistryImplTest extends TestLogger { * Verifies that configured arguments are properly forwarded to the reporter. */ @Test - public void testReporterArgumentForwarding() { + public void testReporterArgumentForwarding() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); - new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown(); + new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown().get(); Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null)); Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null)); @@ -190,11 +190,9 @@ public class MetricRegistryImplTest extends TestLogger { /** * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics. - * - * @throws InterruptedException */ @Test - public void testReporterScheduling() throws InterruptedException { + public void testReporterScheduling() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName()); @@ -225,7 +223,7 @@ public class MetricRegistryImplTest extends TestLogger { } Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); - registry.shutdown(); + registry.shutdown().get(); } /** @@ -244,7 +242,7 @@ public class MetricRegistryImplTest extends TestLogger { * Verifies that reporters are notified of added/removed metrics. */ @Test - public void testReporterNotifications() { + public void testReporterNotifications() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); @@ -268,7 +266,7 @@ public class MetricRegistryImplTest extends TestLogger { assertTrue(TestReporter7.removedMetric instanceof Counter); assertEquals("rootCounter", TestReporter7.removedMetricName); - registry.shutdown(); + registry.shutdown().get(); } /** @@ -338,7 +336,7 @@ public class MetricRegistryImplTest extends TestLogger { } @Test - public void testConfigurableDelimiter() { + public void testConfigurableDelimiter() throws Exception { Configuration config = new Configuration(); config.setString(MetricOptions.SCOPE_DELIMITER, "_"); config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E"); @@ -348,11 +346,11 @@ public class MetricRegistryImplTest extends TestLogger { TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testConfigurableDelimiterForReporters() { + public void testConfigurableDelimiterForReporters() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); @@ -370,11 +368,11 @@ public class MetricRegistryImplTest extends TestLogger { assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3)); assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1)); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testConfigurableDelimiterForReportersInGroup() { + public void testConfigurableDelimiterForReportersInGroup() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); @@ -395,7 +393,7 @@ public class MetricRegistryImplTest extends TestLogger { TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); group.counter("C"); group.close(); - registry.shutdown(); + registry.shutdown().get(); assertEquals(4, TestReporter8.numCorrectDelimitersForRegister); assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister); } @@ -415,7 +413,7 @@ public class MetricRegistryImplTest extends TestLogger { ActorRef queryServiceActor = registry.getQueryService(); - registry.shutdown(); + registry.shutdown().get(); try { Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout); @@ -471,7 +469,7 @@ public class MetricRegistryImplTest extends TestLogger { assertEquals(metric, TestReporter7.removedMetric); assertEquals("counter", TestReporter7.removedMetricName); - registry.shutdown(); + registry.shutdown().get(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index fe22095..1798851 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -162,7 +162,7 @@ public class TaskManagerMetricsTest extends TestLogger { highAvailabilityServices.closeAndCleanupAllData(); } - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index 325982b..f8ed3c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -44,7 +44,7 @@ public class AbstractMetricGroupTest { * called and the parent is null. */ @Test - public void testGetAllVariables() { + public void testGetAllVariables() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) { @@ -60,7 +60,7 @@ public class AbstractMetricGroupTest { }; assertTrue(group.getAllVariables().isEmpty()); - registry.shutdown(); + registry.shutdown().get(); } // ======================================================================== @@ -101,7 +101,7 @@ public class AbstractMetricGroupTest { } } } finally { - testRegistry.shutdown(); + testRegistry.shutdown().get(); } } @@ -176,7 +176,7 @@ public class AbstractMetricGroupTest { } @Test - public void testScopeGenerationWithoutReporters() { + public void testScopeGenerationWithoutReporters() throws Exception { Configuration config = new Configuration(); config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D"); MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); @@ -193,7 +193,7 @@ public class AbstractMetricGroupTest { assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, -1)); assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, 2)); } finally { - testRegistry.shutdown(); + testRegistry.shutdown().get(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index 05a72ac..cb5ec67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -44,7 +44,7 @@ public class JobManagerGroupTest extends TestLogger { // ------------------------------------------------------------------------ @Test - public void addAndRemoveJobs() { + public void addAndRemoveJobs() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); @@ -72,11 +72,11 @@ public class JobManagerGroupTest extends TestLogger { assertTrue(jmJobGroup21.isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups()); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testCloseClosesAll() { + public void testCloseClosesAll() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); @@ -94,7 +94,7 @@ public class JobManagerGroupTest extends TestLogger { assertTrue(jmJobGroup11.isClosed()); assertTrue(jmJobGroup21.isClosed()); - registry.shutdown(); + registry.shutdown().get(); } // ------------------------------------------------------------------------ @@ -102,18 +102,18 @@ public class JobManagerGroupTest extends TestLogger { // ------------------------------------------------------------------------ @Test - public void testGenerateScopeDefault() { + public void testGenerateScopeDefault() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>"); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -123,7 +123,7 @@ public class JobManagerGroupTest extends TestLogger { assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index 4373f80..6f4751b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -38,7 +38,7 @@ import static org.junit.Assert.assertEquals; public class JobManagerJobGroupTest extends TestLogger { @Test - public void testGenerateScopeDefault() { + public void testGenerateScopeDefault() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); @@ -52,11 +52,11 @@ public class JobManagerJobGroupTest extends TestLogger { "theHostName.jobmanager.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>"); @@ -75,11 +75,11 @@ public class JobManagerJobGroupTest extends TestLogger { "some-constant.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustomWildcard() { + public void testGenerateScopeCustomWildcard() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>"); @@ -98,7 +98,7 @@ public class JobManagerJobGroupTest extends TestLogger { "peter.some-constant." + jid + ".name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index bcdcd63..22148db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -44,7 +44,7 @@ public class MetricGroupRegistrationTest extends TestLogger { * Verifies that group methods instantiate the correct metric with the given name. */ @Test - public void testMetricInstantiation() { + public void testMetricInstantiation() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); @@ -85,7 +85,7 @@ public class MetricGroupRegistrationTest extends TestLogger { Assert.assertEquals(histogram, TestReporter1.lastPassedMetric); assertEquals("histogram", TestReporter1.lastPassedName); - registry.shutdown(); + registry.shutdown().get(); } /** @@ -107,7 +107,7 @@ public class MetricGroupRegistrationTest extends TestLogger { * Verifies that when attempting to create a group with the name of an existing one the existing one will be returned instead. */ @Test - public void testDuplicateGroupName() { + public void testDuplicateGroupName() throws Exception { Configuration config = new Configuration(); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); @@ -119,6 +119,6 @@ public class MetricGroupRegistrationTest extends TestLogger { MetricGroup group3 = root.addGroup("group"); Assert.assertTrue(group1 == group2 && group2 == group3); - registry.shutdown(); + registry.shutdown().get(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 4dc5edf..71ae7f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -62,8 +62,8 @@ public class MetricGroupTest extends TestLogger { } @After - public void shutdownRegistry() { - this.registry.shutdown(); + public void shutdownRegistry() throws Exception { + this.registry.shutdown().get(); this.registry = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java index 820b73e..58198e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -32,6 +32,8 @@ import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.Map; @@ -45,10 +47,22 @@ import static org.junit.Assert.assertNotNull; */ public class OperatorGroupTest extends TestLogger { - @Test - public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + @Test + public void testGenerateScopeDefault() throws Exception { TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( @@ -62,12 +76,10 @@ public class OperatorGroupTest extends TestLogger { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name", opGroup.getMetricIdentifier("name")); - - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "<tm_id>.<job_id>.<task_id>.<operator_name>.<operator_id>"); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -91,14 +103,12 @@ public class OperatorGroupTest extends TestLogger { String.format("%s.%s.%s.%s.%s.name", tmID, jid, vertexId, operatorName, operatorID), operatorGroup.getMetricIdentifier("name")); } finally { - registry.shutdown(); + registry.shutdown().get(); } } @Test - public void testIOMetricGroupInstantiation() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - + public void testIOMetricGroupInstantiation() throws Exception { TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( @@ -108,14 +118,10 @@ public class OperatorGroupTest extends TestLogger { assertNotNull(opGroup.getIOMetricGroup()); assertNotNull(opGroup.getIOMetricGroup().getNumRecordsInCounter()); assertNotNull(opGroup.getIOMetricGroup().getNumRecordsOutCounter()); - - registry.shutdown(); } @Test public void testVariables() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - JobID jid = new JobID(); JobVertexID tid = new JobVertexID(); AbstractID eid = new AbstractID(); @@ -140,8 +146,6 @@ public class OperatorGroupTest extends TestLogger { testVariable(variables, ScopeFormat.SCOPE_TASK_ATTEMPT_NUM, "0"); testVariable(variables, ScopeFormat.SCOPE_OPERATOR_ID, oid.toString()); testVariable(variables, ScopeFormat.SCOPE_OPERATOR_NAME, "myOpName"); - - registry.shutdown(); } private static void testVariable(Map<String, String> variables, String key, String expectedValue) { @@ -156,7 +160,6 @@ public class OperatorGroupTest extends TestLogger { JobVertexID vid = new JobVertexID(); AbstractID eid = new AbstractID(); OperatorID oid = new OperatorID(); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index 3272f73..addbcbd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -44,14 +46,26 @@ import static org.junit.Assert.assertTrue; */ public class TaskManagerGroupTest extends TestLogger { + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + // ------------------------------------------------------------------------ // adding and removing jobs // ------------------------------------------------------------------------ @Test public void addAndRemoveJobs() throws IOException { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - final TaskManagerMetricGroup group = new TaskManagerMetricGroup( registry, "localhost", new AbstractID().toString()); @@ -106,13 +120,10 @@ public class TaskManagerGroupTest extends TestLogger { assertTrue(tmGroup13.parent().isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups()); - - registry.shutdown(); } @Test public void testCloseClosesAll() throws IOException { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final TaskManagerMetricGroup group = new TaskManagerMetricGroup( registry, "localhost", new AbstractID().toString()); @@ -142,8 +153,6 @@ public class TaskManagerGroupTest extends TestLogger { assertTrue(tmGroup11.isClosed()); assertTrue(tmGroup12.isClosed()); assertTrue(tmGroup21.isClosed()); - - registry.shutdown(); } // ------------------------------------------------------------------------ @@ -152,16 +161,14 @@ public class TaskManagerGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id"); assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents()); assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name")); - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant.<host>.foo.<host>"); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -169,12 +176,11 @@ public class TaskManagerGroupTest extends TestLogger { assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test public void testCreateQueryServiceMetricInfo() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter()); http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java index b6be31c..52ee578 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -37,10 +39,22 @@ import static org.junit.Assert.assertEquals; */ public class TaskManagerJobGroupTest extends TestLogger { + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + @Test public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); @@ -51,11 +65,10 @@ public class TaskManagerJobGroupTest extends TestLogger { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant.<job_name>"); @@ -73,11 +86,11 @@ public class TaskManagerJobGroupTest extends TestLogger { assertEquals( "some-constant.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustomWildcard() { + public void testGenerateScopeCustomWildcard() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>"); @@ -95,13 +108,12 @@ public class TaskManagerJobGroupTest extends TestLogger { assertEquals( "peter.test-tm-id.some-constant." + jid + ".name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test public void testCreateQueryServiceMetricInfo() { JobID jid = new JobID(); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index 47ee1a9..d9e6158 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -31,7 +31,9 @@ import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -43,13 +45,26 @@ import static org.junit.Assert.assertTrue; */ public class TaskMetricGroupTest extends TestLogger { + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + // ------------------------------------------------------------------------ // scope tests // ----------------------------------------------------------------------- @Test public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobVertexID vertexId = new JobVertexID(); AbstractID executionId = new AbstractID(); @@ -64,11 +79,10 @@ public class TaskMetricGroupTest extends TestLogger { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name", taskGroup.getMetricIdentifier("name")); - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def"); @@ -91,11 +105,11 @@ public class TaskMetricGroupTest extends TestLogger { assertEquals( String.format("test-tm-id.%s.%s.%s.name", jid, vertexId, executionId), taskGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeWilcard() { + public void testGenerateScopeWilcard() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>"); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -115,7 +129,7 @@ public class TaskMetricGroupTest extends TestLogger { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13.name", taskGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test @@ -123,7 +137,6 @@ public class TaskMetricGroupTest extends TestLogger { JobID jid = new JobID(); JobVertexID vid = new JobVertexID(); AbstractID eid = new AbstractID(); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); @@ -136,7 +149,7 @@ public class TaskMetricGroupTest extends TestLogger { } @Test - public void testTaskMetricGroupCleanup() { + public void testTaskMetricGroupCleanup() throws Exception { CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration()); TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0"); TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job"); @@ -150,11 +163,11 @@ public class TaskMetricGroupTest extends TestLogger { // now all registered metrics should have been unregistered assertEquals(0, registry.getNumberRegisteredMetrics()); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testOperatorNameTruncation() { + public void testOperatorNameTruncation() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -168,6 +181,7 @@ public class TaskMetricGroupTest extends TestLogger { String storedName = operatorMetricGroup.getScopeComponents()[0]; Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length()); Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName); + registry.shutdown().get(); } private static class CountingMetricRegistry extends MetricRegistryImpl { http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 9d1af35..ed1aad3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -457,7 +457,7 @@ public class YarnApplicationMasterRunner { if (metricRegistry != null) { try { - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } catch (Throwable t) { LOG.error("Could not properly shut down the metric registry.", t); }
