[FLINK-8670] Make MetricRegistryImpl#shutdown non blocking

This commit makes the MetricRegistryImpl#shutdown method non blocking. Instead
of waiting for the completion of the shutdown procedure, the method returns a
future which is completed once the metric registry has completed the shut down.

This closes #5504.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e29ec0fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e29ec0fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e29ec0fb

Branch: refs/heads/master
Commit: e29ec0fbd2cb03a42b98142f63ce73b97dc2e915
Parents: d9b28e8
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 16 08:12:04 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 23 18:22:09 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExecutorUtils.java    |  21 ++++
 .../MesosApplicationMasterRunner.java           |   2 +-
 .../ScheduledDropwizardReporterTest.java        |   4 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |   2 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |   8 +-
 .../PrometheusReporterTaskScopeTest.java        |   4 +-
 .../prometheus/PrometheusReporterTest.java      |  20 +--
 .../flink/metrics/slf4j/Slf4jReporterTest.java  |   4 +-
 .../metrics/statsd/StatsDReporterTest.java      |   8 +-
 .../apache/flink/runtime/akka/ActorUtils.java   |  89 ++++++++++++++
 .../runtime/metrics/MetricRegistryImpl.java     | 122 ++++++++++---------
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../runtime/metrics/MetricRegistryImplTest.java |  42 +++----
 .../runtime/metrics/TaskManagerMetricsTest.java |   2 +-
 .../metrics/groups/AbstractMetricGroupTest.java |  10 +-
 .../metrics/groups/JobManagerGroupTest.java     |  16 +--
 .../metrics/groups/JobManagerJobGroupTest.java  |  12 +-
 .../groups/MetricGroupRegistrationTest.java     |   8 +-
 .../runtime/metrics/groups/MetricGroupTest.java |   4 +-
 .../metrics/groups/OperatorGroupTest.java       |  37 +++---
 .../metrics/groups/TaskManagerGroupTest.java    |  30 +++--
 .../metrics/groups/TaskManagerJobGroupTest.java |  28 +++--
 .../metrics/groups/TaskMetricGroupTest.java     |  34 ++++--
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +-
 26 files changed, 334 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
index d98bdd2..0a7f161 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -74,4 +75,24 @@ public class ExecutorUtils {
                        }
                }
        }
+
+       /**
+        * Shuts the given {@link ExecutorService} down in a non-blocking 
fashion. The shut down will
+        * be executed by a thread from the common fork-join pool.
+        *
+        * <p>The executor services will be shut down gracefully for the given 
timeout period. Afterwards
+        * {@link ExecutorService#shutdownNow()} will be called.
+        *
+        * @param timeout before {@link ExecutorService#shutdownNow()} is called
+        * @param unit time unit of the timeout
+        * @param executorServices to shut down
+        * @return Future which is completed once the {@link ExecutorService} 
are shut down
+        */
+       public static CompletableFuture<Void> nonBlockingShutdown(long timeout, 
TimeUnit unit, ExecutorService... executorServices) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               gracefulShutdown(timeout, unit, 
executorServices);
+                               return null;
+                       });
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 94804ac..630fa83 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -433,7 +433,7 @@ public class MesosApplicationMasterRunner {
 
                if (metricRegistry != null) {
                        try {
-                               metricRegistry.shutdown();
+                               metricRegistry.shutdown().get();
                        } catch (Throwable t) {
                                LOG.error("Could not shut down metric 
registry.", t);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 4a2ca3a..b69b8d8 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -74,7 +74,7 @@ public class ScheduledDropwizardReporterTest {
         * Tests that the registered metrics' names don't contain invalid 
characters.
         */
        @Test
-       public void testAddingMetrics() throws NoSuchFieldException, 
IllegalAccessException {
+       public void testAddingMetrics() throws Exception {
                Configuration configuration = new Configuration();
                String taskName = "test\"Ta\"..sk";
                String jobName = "testJ\"ob:-!ax..?";
@@ -131,7 +131,7 @@ public class ScheduledDropwizardReporterTest {
 
                assertEquals(expectedCounterName, counters.get(myCounter));
 
-               metricRegistry.shutdown();
+               metricRegistry.shutdown().get();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index fb21a75b..d23a22c 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -150,7 +150,7 @@ public class DropwizardFlinkHistogramWrapperTest extends 
TestLogger {
                        assertEquals(0, testingReporter.getMetrics().size());
                } finally {
                        if (registry != null) {
-                               registry.shutdown();
+                               registry.shutdown().get();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 40b7f15..6e45646 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -145,7 +145,7 @@ public class JMXReporterTest extends TestLogger {
                rep1.notifyOfRemovedMetric(g2, "rep2", null);
 
                mg.close();
-               reg.shutdown();
+               reg.shutdown().get();
        }
 
        /**
@@ -219,7 +219,7 @@ public class JMXReporterTest extends TestLogger {
                rep1.close();
                rep2.close();
                mg.close();
-               reg.shutdown();
+               reg.shutdown().get();
        }
 
        /**
@@ -266,7 +266,7 @@ public class JMXReporterTest extends TestLogger {
 
                } finally {
                        if (registry != null) {
-                               registry.shutdown();
+                               registry.shutdown().get();
                        }
                }
        }
@@ -306,7 +306,7 @@ public class JMXReporterTest extends TestLogger {
 
                } finally {
                        if (registry != null) {
-                               registry.shutdown();
+                               registry.shutdown().get();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
index d4ad1f9..724a79b 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
@@ -90,9 +90,9 @@ public class PrometheusReporterTaskScopeTest {
        }
 
        @After
-       public void shutdownRegistry() {
+       public void shutdownRegistry() throws Exception {
                if (registry != null) {
-                       registry.shutdown();
+                       registry.shutdown().get();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 6833a06..e9fd985 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -84,9 +84,9 @@ public class PrometheusReporterTest extends TestLogger {
        }
 
        @After
-       public void shutdownRegistry() {
+       public void shutdownRegistry() throws Exception {
                if (registry != null) {
-                       registry.shutdown();
+                       registry.shutdown().get();
                }
        }
 
@@ -237,7 +237,7 @@ public class PrometheusReporterTest extends TestLogger {
        }
 
        @Test
-       public void cannotStartTwoReportersOnSamePort() {
+       public void cannotStartTwoReportersOnSamePort() throws Exception {
                final MetricRegistryImpl fixedPort1 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 portRangeProvider.next())));
                assertThat(fixedPort1.getReporters(), hasSize(1));
 
@@ -246,12 +246,12 @@ public class PrometheusReporterTest extends TestLogger {
                final MetricRegistryImpl fixedPort2 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 String.valueOf(firstReporter.getPort()))));
                assertThat(fixedPort2.getReporters(), hasSize(0));
 
-               fixedPort1.shutdown();
-               fixedPort2.shutdown();
+               fixedPort1.shutdown().get();
+               fixedPort2.shutdown().get();
        }
 
        @Test
-       public void canStartTwoReportersWhenUsingPortRange() {
+       public void canStartTwoReportersWhenUsingPortRange() throws Exception {
                String portRange = portRangeProvider.next();
                final MetricRegistryImpl portRange1 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 portRange)));
                final MetricRegistryImpl portRange2 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 portRange)));
@@ -259,8 +259,8 @@ public class PrometheusReporterTest extends TestLogger {
                assertThat(portRange1.getReporters(), hasSize(1));
                assertThat(portRange2.getReporters(), hasSize(1));
 
-               portRange1.shutdown();
-               portRange2.shutdown();
+               portRange1.shutdown().get();
+               portRange2.shutdown().get();
        }
 
        private String addMetricAndPollResponse(Metric metric, String 
metricName) throws UnirestException {
@@ -280,8 +280,8 @@ public class PrometheusReporterTest extends TestLogger {
        }
 
        @After
-       public void closeReporterAndShutdownRegistry() {
-               registry.shutdown();
+       public void closeReporterAndShutdownRegistry() throws Exception {
+               registry.shutdown().get();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index b344f45..172c79c 100644
--- 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -76,8 +76,8 @@ public class Slf4jReporterTest extends TestLogger {
        }
 
        @AfterClass
-       public static void tearDown() {
-               registry.shutdown();
+       public static void tearDown() throws Exception {
+               registry.shutdown().get();
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 08d4998..c9f5af0 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -73,7 +73,7 @@ public class StatsDReporterTest extends TestLogger {
         * Tests that the registered metrics' names don't contain invalid 
characters.
         */
        @Test
-       public void testAddingMetrics() throws NoSuchFieldException, 
IllegalAccessException {
+       public void testAddingMetrics() throws Exception {
                Configuration configuration = new Configuration();
                String taskName = "testTask";
                String jobName = "testJob:-!ax..?";
@@ -124,7 +124,7 @@ public class StatsDReporterTest extends TestLogger {
 
                assertEquals(expectedCounterName, counters.get(myCounter));
 
-               metricRegistry.shutdown();
+               metricRegistry.shutdown().get();
        }
 
        /**
@@ -187,7 +187,7 @@ public class StatsDReporterTest extends TestLogger {
 
                } finally {
                        if (registry != null) {
-                               registry.shutdown();
+                               registry.shutdown().get();
                        }
 
                        if (receiver != null) {
@@ -247,7 +247,7 @@ public class StatsDReporterTest extends TestLogger {
 
                } finally {
                        if (registry != null) {
-                               registry.shutdown();
+                               registry.shutdown().get();
                        }
 
                        if (receiver != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
new file mode 100644
index 0000000..f2f9059
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+
+import akka.actor.ActorRef;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utility functions for the interaction with Akka {@link akka.actor.Actor}.
+ */
+public class ActorUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ActorUtils.class);
+
+       /**
+        * Shuts the given {@link akka.actor.Actor} down in a non blocking 
fashion. The method first tries to
+        * gracefully shut them down. If this is not successful, then the 
actors will be terminated by sending
+        * a {@link akka.actor.Kill} message.
+        *
+        * @param gracePeriod for the graceful shutdown
+        * @param timeUnit time unit of the grace period
+        * @param actors to shut down
+        * @return Future which is completed once all actors have been shut 
down gracefully or forceful
+        * kill messages have been sent to all actors. Occurring errors will be 
suppressed into one error.
+        */
+       public static CompletableFuture<Void> nonBlockingShutDown(long 
gracePeriod, TimeUnit timeUnit, ActorRef... actors) {
+               final Collection<CompletableFuture<Void>> terminationFutures = 
new ArrayList<>(actors.length);
+               final FiniteDuration timeout = new FiniteDuration(gracePeriod, 
timeUnit);
+
+               for (ActorRef actor : actors) {
+                       try {
+                               final Future<Boolean> booleanFuture = 
Patterns.gracefulStop(actor, timeout);
+                               final CompletableFuture<Void> terminationFuture 
= FutureUtils.toJava(booleanFuture)
+                                       .<Void>thenApply(ignored -> null)
+                                       .exceptionally((Throwable throwable) -> 
{
+                                               if (throwable instanceof 
TimeoutException) {
+                                                       // the actor did not 
gracefully stop within the grace period --> Let's kill him
+                                                       
actor.tell(Kill.getInstance(), ActorRef.noSender());
+                                                       return null;
+                                               } else {
+                                                       throw new 
CompletionException(throwable);
+                                               }
+                                       });
+
+                               terminationFutures.add(terminationFuture);
+                       } catch (IllegalStateException ignored) {
+                               // this can happen if the underlying actor 
system has been stopped before shutting
+                               // the actor down
+                               LOG.debug("The actor {} has already been 
stopped because the " +
+                                       "underlying ActorSystem has already 
been shut down.", actor.path());
+                       }
+               }
+
+               return FutureUtils.completeAll(terminationFutures);
+       }
+
+       private ActorUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index c8f4490..6b37709 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -28,35 +29,36 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.View;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
  * connection between {@link MetricGroup MetricGroups} and {@link 
MetricReporter MetricReporters}.
@@ -66,8 +68,14 @@ public class MetricRegistryImpl implements MetricRegistry {
 
        private final Object lock = new Object();
 
-       private List<MetricReporter> reporters;
-       private ScheduledExecutorService executor;
+       private final List<MetricReporter> reporters;
+       private final ScheduledExecutorService executor;
+
+       private final ScopeFormats scopeFormats;
+       private final char globalDelimiter;
+       private final List<Character> delimiters;
+
+       private final CompletableFuture<Void> terminationFuture;
 
        @Nullable
        private ActorRef queryService;
@@ -77,9 +85,7 @@ public class MetricRegistryImpl implements MetricRegistry {
 
        private ViewUpdater viewUpdater;
 
-       private final ScopeFormats scopeFormats;
-       private final char globalDelimiter;
-       private final List<Character> delimiters = new ArrayList<>();
+       private boolean isShutdown;
 
        /**
         * Creates a new MetricRegistry and starts the configured reporter.
@@ -87,9 +93,12 @@ public class MetricRegistryImpl implements MetricRegistry {
        public MetricRegistryImpl(MetricRegistryConfiguration config) {
                this.scopeFormats = config.getScopeFormats();
                this.globalDelimiter = config.getDelimiter();
+               this.delimiters = new ArrayList<>(10);
+               this.terminationFuture = new CompletableFuture<>();
+               this.isShutdown = false;
 
                // second, instantiate any custom configured reporters
-               this.reporters = new ArrayList<>();
+               this.reporters = new ArrayList<>(4);
 
                List<Tuple2<String, Configuration>> reporterConfigurations = 
config.getReporterConfigurations();
 
@@ -226,71 +235,72 @@ public class MetricRegistryImpl implements MetricRegistry 
{
         */
        public boolean isShutdown() {
                synchronized (lock) {
-                       return reporters == null && executor.isShutdown();
+                       return isShutdown;
                }
        }
 
        /**
         * Shuts down this registry and the associated {@link MetricReporter}.
+        *
+        * <p>NOTE: This operation is asynchronous and returns a future which 
is completed
+        * once the shutdown operation has been completed.
+        *
+        * @return Future which is completed once the {@link MetricRegistryImpl}
+        * is shut down.
         */
-       public void shutdown() {
+       public CompletableFuture<Void> shutdown() {
                synchronized (lock) {
-                       Future<Boolean> stopFuture = null;
-                       FiniteDuration stopTimeout = null;
+                       if (isShutdown) {
+                               return terminationFuture;
+                       } else {
+                               isShutdown = true;
+                               final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(3);
+                               final Time gracePeriod = Time.seconds(1L);
 
-                       if (queryService != null) {
-                               stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
+                               if (queryService != null) {
+                                       final CompletableFuture<Void> 
queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown(
+                                               gracePeriod.toMilliseconds(),
+                                               TimeUnit.MILLISECONDS,
+                                               queryService);
 
-                               try {
-                                       stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
-                               } catch (IllegalStateException ignored) {
-                                       // this can happen if the underlying 
actor system has been stopped before shutting
-                                       // the metric registry down
-                                       // TODO: Pull the MetricQueryService 
actor out of the MetricRegistry
-                                       LOG.debug("The metric query service 
actor has already been stopped because the " +
-                                               "underlying ActorSystem has 
already been shut down.");
+                                       
terminationFutures.add(queryServiceTerminationFuture);
                                }
-                       }
 
-                       if (reporters != null) {
+                               Throwable throwable = null;
                                for (MetricReporter reporter : reporters) {
                                        try {
                                                reporter.close();
                                        } catch (Throwable t) {
-                                               LOG.warn("Metrics reporter did 
not shut down cleanly", t);
+                                               throwable = 
ExceptionUtils.firstOrSuppressed(t, throwable);
                                        }
                                }
-                               reporters = null;
-                       }
-                       shutdownExecutor();
-
-                       if (stopFuture != null) {
-                               boolean stopped = false;
-
-                               try {
-                                       stopped = Await.result(stopFuture, 
stopTimeout);
-                               } catch (Exception e) {
-                                       LOG.warn("Query actor did not properly 
stop.", e);
-                               }
+                               reporters.clear();
 
-                               if (!stopped) {
-                                       // the query actor did not stop in 
time, let's kill him
-                                       queryService.tell(Kill.getInstance(), 
ActorRef.noSender());
+                               if (throwable != null) {
+                                       terminationFutures.add(
+                                               
FutureUtils.completedExceptionally(
+                                                       new 
FlinkException("Could not shut down the metric reporters properly.", 
throwable)));
                                }
-                       }
-               }
-       }
 
-       private void shutdownExecutor() {
-               if (executor != null) {
-                       executor.shutdown();
+                               final CompletableFuture<Void> 
executorShutdownFuture = ExecutorUtils.nonBlockingShutdown(
+                                       gracePeriod.toMilliseconds(),
+                                       TimeUnit.MILLISECONDS,
+                                       executor);
+
+                               terminationFutures.add(executorShutdownFuture);
+
+                               FutureUtils
+                                       .completeAll(terminationFutures)
+                                       .whenComplete(
+                                               (Void ignored, Throwable error) 
-> {
+                                                       if (error != null) {
+                                                               
terminationFuture.completeExceptionally(error);
+                                                       } else {
+                                                               
terminationFuture.complete(null);
+                                                       }
+                                               });
 
-                       try {
-                               if (!executor.awaitTermination(1L, 
TimeUnit.SECONDS)) {
-                                       executor.shutdownNow();
-                               }
-                       } catch (InterruptedException e) {
-                               executor.shutdownNow();
+                               return terminationFuture;
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d0e401c..1dfaa5d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2054,7 +2054,7 @@ object JobManager {
     }
 
     try {
-      metricRegistry.shutdown()
+      metricRegistry.shutdown().get()
     } catch {
       case t: Throwable =>
         LOG.warn("Could not properly shut down the metric registry.", t)

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 7948ba1..6c9ee5b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -468,7 +468,7 @@ abstract class FlinkMiniCluster(
 
     Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout)
 
-    metricRegistryOpt.foreach(_.shutdown())
+    metricRegistryOpt.foreach(_.shutdown().get())
 
     if (!useSingleActorSystem) {
       taskManagerActorSystems foreach {

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 106bea1..485add5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1900,7 +1900,7 @@ object TaskManager {
 
     // shut down the metric query service
     try {
-      metricRegistry.shutdown()
+      metricRegistry.shutdown().get()
     } catch {
       case t: Throwable =>
         LOG.error("Could not properly shut down the metric registry.", t)

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
index 2eccc0c..adb622d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -59,12 +59,12 @@ public class MetricRegistryImplTest extends TestLogger {
        private static final char GLOBAL_DEFAULT_DELIMITER = '.';
 
        @Test
-       public void testIsShutdown() {
+       public void testIsShutdown() throws Exception {
                MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                Assert.assertFalse(metricRegistry.isShutdown());
 
-               metricRegistry.shutdown();
+               metricRegistry.shutdown().get();
 
                Assert.assertTrue(metricRegistry.isShutdown());
        }
@@ -73,7 +73,7 @@ public class MetricRegistryImplTest extends TestLogger {
         * Verifies that the reporter name list is correctly used to determine 
which reporters should be instantiated.
         */
        @Test
-       public void testReporterInclusion() {
+       public void testReporterInclusion() throws Exception {
                Configuration config = new Configuration();
 
                config.setString(MetricOptions.REPORTERS_LIST, "test");
@@ -87,7 +87,7 @@ public class MetricRegistryImplTest extends TestLogger {
                Assert.assertTrue(TestReporter1.wasOpened);
                Assert.assertFalse(TestReporter11.wasOpened);
 
-               metricRegistry.shutdown();
+               metricRegistry.shutdown().get();
        }
 
        /**
@@ -106,7 +106,7 @@ public class MetricRegistryImplTest extends TestLogger {
         * Verifies that multiple reporters are instantiated correctly.
         */
        @Test
-       public void testMultipleReporterInstantiation() {
+       public void testMultipleReporterInstantiation() throws Exception {
                Configuration config = new Configuration();
 
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter11.class.getName());
@@ -121,7 +121,7 @@ public class MetricRegistryImplTest extends TestLogger {
                Assert.assertTrue(TestReporter12.wasOpened);
                Assert.assertTrue(TestReporter13.wasOpened);
 
-               metricRegistry.shutdown();
+               metricRegistry.shutdown().get();
        }
 
        /**
@@ -164,14 +164,14 @@ public class MetricRegistryImplTest extends TestLogger {
         * Verifies that configured arguments are properly forwarded to the 
reporter.
         */
        @Test
-       public void testReporterArgumentForwarding() {
+       public void testReporterArgumentForwarding() throws Exception {
                Configuration config = new Configuration();
 
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg2", "world");
 
-               new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
+               new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown().get();
 
                Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", 
null));
                Assert.assertEquals("world", TestReporter2.mc.getString("arg2", 
null));
@@ -190,11 +190,9 @@ public class MetricRegistryImplTest extends TestLogger {
 
        /**
         * Verifies that reporters implementing the Scheduled interface are 
regularly called to report the metrics.
-        *
-        * @throws InterruptedException
         */
        @Test
-       public void testReporterScheduling() throws InterruptedException {
+       public void testReporterScheduling() throws Exception {
                Configuration config = new Configuration();
 
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter3.class.getName());
@@ -225,7 +223,7 @@ public class MetricRegistryImplTest extends TestLogger {
                }
                Assert.assertTrue("No report was triggered.", 
TestReporter3.reportCount > 0);
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        /**
@@ -244,7 +242,7 @@ public class MetricRegistryImplTest extends TestLogger {
         * Verifies that reporters are notified of added/removed metrics.
         */
        @Test
-       public void testReporterNotifications() {
+       public void testReporterNotifications() throws Exception {
                Configuration config = new Configuration();
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter6.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
@@ -268,7 +266,7 @@ public class MetricRegistryImplTest extends TestLogger {
                assertTrue(TestReporter7.removedMetric instanceof Counter);
                assertEquals("rootCounter", TestReporter7.removedMetricName);
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        /**
@@ -338,7 +336,7 @@ public class MetricRegistryImplTest extends TestLogger {
        }
 
        @Test
-       public void testConfigurableDelimiter() {
+       public void testConfigurableDelimiter() throws Exception {
                Configuration config = new Configuration();
                config.setString(MetricOptions.SCOPE_DELIMITER, "_");
                config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
@@ -348,11 +346,11 @@ public class MetricRegistryImplTest extends TestLogger {
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "host", "id");
                assertEquals("A_B_C_D_E_name", 
tmGroup.getMetricIdentifier("name"));
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testConfigurableDelimiterForReporters() {
+       public void testConfigurableDelimiterForReporters() throws Exception {
                Configuration config = new Configuration();
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
@@ -370,11 +368,11 @@ public class MetricRegistryImplTest extends TestLogger {
                assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(3));
                assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(-1));
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testConfigurableDelimiterForReportersInGroup() {
+       public void testConfigurableDelimiterForReportersInGroup() throws 
Exception {
                Configuration config = new Configuration();
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
@@ -395,7 +393,7 @@ public class MetricRegistryImplTest extends TestLogger {
                TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
                group.counter("C");
                group.close();
-               registry.shutdown();
+               registry.shutdown().get();
                assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
                assertEquals(4, 
TestReporter8.numCorrectDelimitersForUnregister);
        }
@@ -415,7 +413,7 @@ public class MetricRegistryImplTest extends TestLogger {
 
                ActorRef queryServiceActor = registry.getQueryService();
 
-               registry.shutdown();
+               registry.shutdown().get();
 
                try {
                        
Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout),
 timeout);
@@ -471,7 +469,7 @@ public class MetricRegistryImplTest extends TestLogger {
                assertEquals(metric, TestReporter7.removedMetric);
                assertEquals("counter", TestReporter7.removedMetricName);
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index fe22095..1798851 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -162,7 +162,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                                
highAvailabilityServices.closeAndCleanupAllData();
                        }
 
-                       metricRegistry.shutdown();
+                       metricRegistry.shutdown().get();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 325982b..f8ed3c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -44,7 +44,7 @@ public class AbstractMetricGroupTest {
         * called and the parent is null.
         */
        @Test
-       public void testGetAllVariables() {
+       public void testGetAllVariables() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                AbstractMetricGroup group = new 
AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
@@ -60,7 +60,7 @@ public class AbstractMetricGroupTest {
                };
                assertTrue(group.getAllVariables().isEmpty());
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        // 
========================================================================
@@ -101,7 +101,7 @@ public class AbstractMetricGroupTest {
                                }
                        }
                } finally {
-                       testRegistry.shutdown();
+                       testRegistry.shutdown().get();
                }
        }
 
@@ -176,7 +176,7 @@ public class AbstractMetricGroupTest {
        }
 
        @Test
-       public void testScopeGenerationWithoutReporters() {
+       public void testScopeGenerationWithoutReporters() throws Exception {
                Configuration config = new Configuration();
                config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
                MetricRegistryImpl testRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
@@ -193,7 +193,7 @@ public class AbstractMetricGroupTest {
                        assertEquals("A.X.C.D.1", 
group.getMetricIdentifier("1", FILTER_B, -1));
                        assertEquals("A.X.C.D.1", 
group.getMetricIdentifier("1", FILTER_B, 2));
                } finally {
-                       testRegistry.shutdown();
+                       testRegistry.shutdown().get();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 05a72ac..cb5ec67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -44,7 +44,7 @@ public class JobManagerGroupTest extends TestLogger {
        // 
------------------------------------------------------------------------
 
        @Test
-       public void addAndRemoveJobs() {
+       public void addAndRemoveJobs() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
@@ -72,11 +72,11 @@ public class JobManagerGroupTest extends TestLogger {
                assertTrue(jmJobGroup21.isClosed());
                assertEquals(0, group.numRegisteredJobMetricGroups());
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testCloseClosesAll() {
+       public void testCloseClosesAll() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                final JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
@@ -94,7 +94,7 @@ public class JobManagerGroupTest extends TestLogger {
                assertTrue(jmJobGroup11.isClosed());
                assertTrue(jmJobGroup21.isClosed());
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        // 
------------------------------------------------------------------------
@@ -102,18 +102,18 @@ public class JobManagerGroupTest extends TestLogger {
        // 
------------------------------------------------------------------------
 
        @Test
-       public void testGenerateScopeDefault() {
+       public void testGenerateScopeDefault() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
 
                assertArrayEquals(new String[]{"localhost", "jobmanager"}, 
group.getScopeComponents());
                assertEquals("localhost.jobmanager.name", 
group.getMetricIdentifier("name"));
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeCustom() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, 
"constant.<host>.foo.<host>");
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -123,7 +123,7 @@ public class JobManagerGroupTest extends TestLogger {
                assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
                assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index 4373f80..6f4751b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.assertEquals;
 public class JobManagerJobGroupTest extends TestLogger {
 
        @Test
-       public void testGenerateScopeDefault() {
+       public void testGenerateScopeDefault() throws Exception {
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
                JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
@@ -52,11 +52,11 @@ public class JobManagerJobGroupTest extends TestLogger {
                                "theHostName.jobmanager.myJobName.name",
                                jmGroup.getMetricIdentifier("name"));
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeCustom() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
                cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"some-constant.<job_name>");
@@ -75,11 +75,11 @@ public class JobManagerJobGroupTest extends TestLogger {
                                "some-constant.myJobName.name",
                                jmGroup.getMetricIdentifier("name"));
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testGenerateScopeCustomWildcard() {
+       public void testGenerateScopeCustomWildcard() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
                cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"*.some-constant.<job_id>");
@@ -98,7 +98,7 @@ public class JobManagerJobGroupTest extends TestLogger {
                                "peter.some-constant." + jid + ".name",
                                jmGroup.getMetricIdentifier("name"));
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index bcdcd63..22148db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -44,7 +44,7 @@ public class MetricGroupRegistrationTest extends TestLogger {
         * Verifies that group methods instantiate the correct metric with the 
given name.
         */
        @Test
-       public void testMetricInstantiation() {
+       public void testMetricInstantiation() throws Exception {
                Configuration config = new Configuration();
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
@@ -85,7 +85,7 @@ public class MetricGroupRegistrationTest extends TestLogger {
 
                Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
                assertEquals("histogram", TestReporter1.lastPassedName);
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        /**
@@ -107,7 +107,7 @@ public class MetricGroupRegistrationTest extends TestLogger 
{
         * Verifies that when attempting to create a group with the name of an 
existing one the existing one will be returned instead.
         */
        @Test
-       public void testDuplicateGroupName() {
+       public void testDuplicateGroupName() throws Exception {
                Configuration config = new Configuration();
 
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
@@ -119,6 +119,6 @@ public class MetricGroupRegistrationTest extends TestLogger 
{
                MetricGroup group3 = root.addGroup("group");
                Assert.assertTrue(group1 == group2 && group2 == group3);
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 4dc5edf..71ae7f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -62,8 +62,8 @@ public class MetricGroupTest extends TestLogger {
        }
 
        @After
-       public void shutdownRegistry() {
-               this.registry.shutdown();
+       public void shutdownRegistry() throws Exception {
+               this.registry.shutdown().get();
                this.registry = null;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 820b73e..58198e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Map;
@@ -45,10 +47,22 @@ import static org.junit.Assert.assertNotNull;
  */
 public class OperatorGroupTest extends TestLogger {
 
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+       private MetricRegistryImpl registry;
+
+       @Before
+       public void setup() {
+               registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+       }
+
+       @After
+       public void teardown() throws Exception {
+               if (registry != null) {
+                       registry.shutdown().get();
+               }
+       }
 
+       @Test
+       public void testGenerateScopeDefault() throws Exception {
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
                TaskMetricGroup taskGroup = new TaskMetricGroup(
@@ -62,12 +76,10 @@ public class OperatorGroupTest extends TestLogger {
                assertEquals(
                                
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name",
                                opGroup.getMetricIdentifier("name"));
-
-               registry.shutdown();
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeCustom() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, 
"<tm_id>.<job_id>.<task_id>.<operator_name>.<operator_id>");
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -91,14 +103,12 @@ public class OperatorGroupTest extends TestLogger {
                                String.format("%s.%s.%s.%s.%s.name", tmID, jid, 
vertexId, operatorName, operatorID),
                                operatorGroup.getMetricIdentifier("name"));
                } finally {
-                       registry.shutdown();
+                       registry.shutdown().get();
                }
        }
 
        @Test
-       public void testIOMetricGroupInstantiation() {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
+       public void testIOMetricGroupInstantiation() throws Exception {
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
                TaskMetricGroup taskGroup = new TaskMetricGroup(
@@ -108,14 +118,10 @@ public class OperatorGroupTest extends TestLogger {
                assertNotNull(opGroup.getIOMetricGroup());
                
assertNotNull(opGroup.getIOMetricGroup().getNumRecordsInCounter());
                
assertNotNull(opGroup.getIOMetricGroup().getNumRecordsOutCounter());
-
-               registry.shutdown();
        }
 
        @Test
        public void testVariables() {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
                JobID jid = new JobID();
                JobVertexID tid = new JobVertexID();
                AbstractID eid = new AbstractID();
@@ -140,8 +146,6 @@ public class OperatorGroupTest extends TestLogger {
                testVariable(variables, ScopeFormat.SCOPE_TASK_ATTEMPT_NUM, 
"0");
                testVariable(variables, ScopeFormat.SCOPE_OPERATOR_ID, 
oid.toString());
                testVariable(variables, ScopeFormat.SCOPE_OPERATOR_NAME, 
"myOpName");
-
-               registry.shutdown();
        }
 
        private static void testVariable(Map<String, String> variables, String 
key, String expectedValue) {
@@ -156,7 +160,6 @@ public class OperatorGroupTest extends TestLogger {
                JobVertexID vid = new JobVertexID();
                AbstractID eid = new AbstractID();
                OperatorID oid = new OperatorID();
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
                TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 3272f73..addbcbd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -44,14 +46,26 @@ import static org.junit.Assert.assertTrue;
  */
 public class TaskManagerGroupTest extends TestLogger {
 
+       private MetricRegistryImpl registry;
+
+       @Before
+       public void setup() {
+               registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+       }
+
+       @After
+       public void teardown() throws Exception {
+               if (registry != null) {
+                       registry.shutdown().get();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  adding and removing jobs
        // 
------------------------------------------------------------------------
 
        @Test
        public void addAndRemoveJobs() throws IOException {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
                final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
                                registry, "localhost", new 
AbstractID().toString());
 
@@ -106,13 +120,10 @@ public class TaskManagerGroupTest extends TestLogger {
                assertTrue(tmGroup13.parent().isClosed());
 
                assertEquals(0, group.numRegisteredJobMetricGroups());
-
-               registry.shutdown();
        }
 
        @Test
        public void testCloseClosesAll() throws IOException {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
                        registry, "localhost", new AbstractID().toString());
 
@@ -142,8 +153,6 @@ public class TaskManagerGroupTest extends TestLogger {
                assertTrue(tmGroup11.isClosed());
                assertTrue(tmGroup12.isClosed());
                assertTrue(tmGroup21.isClosed());
-
-               registry.shutdown();
        }
 
        // 
------------------------------------------------------------------------
@@ -152,16 +161,14 @@ public class TaskManagerGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "localhost", "id");
 
                assertArrayEquals(new String[]{"localhost", "taskmanager", 
"id"}, group.getScopeComponents());
                assertEquals("localhost.taskmanager.id.name", 
group.getMetricIdentifier("name"));
-               registry.shutdown();
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeCustom() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, 
"constant.<host>.foo.<host>");
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -169,12 +176,11 @@ public class TaskManagerGroupTest extends TestLogger {
 
                assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
                assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
        public void testCreateQueryServiceMetricInfo() {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
 
                QueryScopeInfo.TaskManagerQueryScopeInfo info = 
tm.createQueryServiceMetricInfo(new DummyCharacterFilter());

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index b6be31c..52ee578 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -37,10 +39,22 @@ import static org.junit.Assert.assertEquals;
  */
 public class TaskManagerJobGroupTest extends TestLogger {
 
+       private MetricRegistryImpl registry;
+
+       @Before
+       public void setup() {
+               registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+       }
+
+       @After
+       public void teardown() throws Exception {
+               if (registry != null) {
+                       registry.shutdown().get();
+               }
+       }
+
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
 
@@ -51,11 +65,10 @@ public class TaskManagerJobGroupTest extends TestLogger {
                assertEquals(
                                
"theHostName.taskmanager.test-tm-id.myJobName.name",
                                jmGroup.getMetricIdentifier("name"));
-               registry.shutdown();
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeCustom() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
                cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, 
"some-constant.<job_name>");
@@ -73,11 +86,11 @@ public class TaskManagerJobGroupTest extends TestLogger {
                assertEquals(
                                "some-constant.myJobName.name",
                                jmGroup.getMetricIdentifier("name"));
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testGenerateScopeCustomWildcard() {
+       public void testGenerateScopeCustomWildcard() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>");
                cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, 
"*.some-constant.<job_id>");
@@ -95,13 +108,12 @@ public class TaskManagerJobGroupTest extends TestLogger {
                assertEquals(
                                "peter.test-tm-id.some-constant." + jid + 
".name",
                                jmGroup.getMetricIdentifier("name"));
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
        public void testCreateQueryServiceMetricInfo() {
                JobID jid = new JobID();
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 47ee1a9..d9e6158 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -31,7 +31,9 @@ import 
org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -43,13 +45,26 @@ import static org.junit.Assert.assertTrue;
  */
 public class TaskMetricGroupTest extends TestLogger {
 
+       private MetricRegistryImpl registry;
+
+       @Before
+       public void setup() {
+               registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+       }
+
+       @After
+       public void teardown() throws Exception {
+               if (registry != null) {
+                       registry.shutdown().get();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  scope tests
        // 
-----------------------------------------------------------------------
 
        @Test
        public void testGenerateScopeDefault() {
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                JobVertexID vertexId = new JobVertexID();
                AbstractID executionId = new AbstractID();
 
@@ -64,11 +79,10 @@ public class TaskMetricGroupTest extends TestLogger {
                assertEquals(
                                
"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name",
                                taskGroup.getMetricIdentifier("name"));
-               registry.shutdown();
        }
 
        @Test
-       public void testGenerateScopeCustom() {
+       public void testGenerateScopeCustom() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
                cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def");
@@ -91,11 +105,11 @@ public class TaskMetricGroupTest extends TestLogger {
                assertEquals(
                                String.format("test-tm-id.%s.%s.%s.name", jid, 
vertexId, executionId),
                                taskGroup.getMetricIdentifier("name"));
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testGenerateScopeWilcard() {
+       public void testGenerateScopeWilcard() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_TASK, 
"*.<task_attempt_id>.<subtask_index>");
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -115,7 +129,7 @@ public class TaskMetricGroupTest extends TestLogger {
                assertEquals(
                                "theHostName.taskmanager.test-tm-id.myJobName." 
+ executionId + ".13.name",
                                taskGroup.getMetricIdentifier("name"));
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
@@ -123,7 +137,6 @@ public class TaskMetricGroupTest extends TestLogger {
                JobID jid = new JobID();
                JobVertexID vid = new JobVertexID();
                AbstractID eid = new AbstractID();
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
                TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);
@@ -136,7 +149,7 @@ public class TaskMetricGroupTest extends TestLogger {
        }
 
        @Test
-       public void testTaskMetricGroupCleanup() {
+       public void testTaskMetricGroupCleanup() throws Exception {
                CountingMetricRegistry registry = new 
CountingMetricRegistry(new Configuration());
                TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "0");
                TaskManagerJobMetricGroup taskManagerJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
@@ -150,11 +163,11 @@ public class TaskMetricGroupTest extends TestLogger {
                // now all registered metrics should have been unregistered
                assertEquals(0, registry.getNumberRegisteredMetrics());
 
-               registry.shutdown();
+               registry.shutdown().get();
        }
 
        @Test
-       public void testOperatorNameTruncation() {
+       public void testOperatorNameTruncation() throws Exception {
                Configuration cfg = new Configuration();
                cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, 
ScopeFormat.SCOPE_OPERATOR_NAME);
                MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -168,6 +181,7 @@ public class TaskMetricGroupTest extends TestLogger {
                String storedName = operatorMetricGroup.getScopeComponents()[0];
                
Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, 
storedName.length());
                Assert.assertEquals(originalName.substring(0, 
TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName);
+               registry.shutdown().get();
        }
 
        private static class CountingMetricRegistry extends MetricRegistryImpl {

http://git-wip-us.apache.org/repos/asf/flink/blob/e29ec0fb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 9d1af35..ed1aad3 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -457,7 +457,7 @@ public class YarnApplicationMasterRunner {
 
                if (metricRegistry != null) {
                        try {
-                               metricRegistry.shutdown();
+                               metricRegistry.shutdown().get();
                        } catch (Throwable t) {
                                LOG.error("Could not properly shut down the 
metric registry.", t);
                        }

Reply via email to