[FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to 
MiniCluster

If the managed memory size for the task manager has not been set in the 
Configuration, then
it is automatically calculated by dividing the available memory by the number 
of distributed
components. Additionally this PR allows to provide a MetricRegistry to the 
TaskManagerRunner.
That way it is possible to use the MiniCluster's MetricRegistry.

Add memory calculation for task managers

This closes #2669.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8b22e04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8b22e04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8b22e04

Branch: refs/heads/master
Commit: d8b22e0448216ae54d0f0cf0ece3f2afe71f2593
Parents: c001bec
Author: Till Rohrmann <[email protected]>
Authored: Thu Oct 20 11:07:08 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:24 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |   7 +-
 .../minicluster/MiniClusterConfiguration.java   |  86 ++++++++++++++-
 .../resourcemanager/JobLeaderIdService.java     |   2 +-
 .../resourcemanager/ResourceManager.java        |   3 +-
 .../resourcemanager/ResourceManagerRunner.java  | 102 ++++++++++++++++++
 .../exceptions/ConfigurationException.java      |   2 +-
 .../exceptions/ResourceManagerRunner.java       | 105 -------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  41 +++-----
 .../runtime/taskexecutor/TaskManagerRunner.java |  28 ++++-
 9 files changed, 229 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b005330..611d4c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerRunner;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -161,7 +161,7 @@ public class MiniCluster {
                        LOG.info("Starting Flink Mini Cluster");
                        LOG.debug("Using configuration {}", config);
 
-                       final Configuration configuration = new 
UnmodifiableConfiguration(config.getConfiguration());
+                       final Configuration configuration = new 
UnmodifiableConfiguration(config.generateConfiguration());
                        final Time rpcTimeout = config.getRpcTimeout();
                        final int numJobManagers = config.getNumJobManagers();
                        final int numTaskManagers = config.getNumTaskManagers();
@@ -468,7 +468,8 @@ public class MiniCluster {
                                configuration,
                                new ResourceID(UUID.randomUUID().toString()),
                                taskManagerRpcServices[i],
-                               haServices);
+                               haServices,
+                               metricRegistry);
 
                        taskManagerRunners[i].start();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index cfbbffb..3a03ca3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -41,6 +44,8 @@ public class MiniClusterConfiguration {
 
        private String commonBindAddress;
 
+       private long managedMemoryPerTaskManager = -1;
+
        // 
------------------------------------------------------------------------
        //  Construction
        // 
------------------------------------------------------------------------
@@ -96,14 +101,15 @@ public class MiniClusterConfiguration {
                this.commonBindAddress = bindAddress;
        }
 
+       public void setManagedMemoryPerTaskManager(long 
managedMemoryPerTaskManager) {
+               checkArgument(managedMemoryPerTaskManager > 0, "must have more 
than 0 MB of memory for the TaskManager.");
+               this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+       }
+
        // 
------------------------------------------------------------------------
        //  getters
        // 
------------------------------------------------------------------------
 
-       public Configuration getConfiguration() {
-               return config;
-       }
-
        public boolean getUseSingleRpcSystem() {
                return singleRpcService;
        }
@@ -147,10 +153,23 @@ public class MiniClusterConfiguration {
                return Time.of(duration.length(), duration.unit());
        }
 
+       public long getManagedMemoryPerTaskManager() {
+               return getOrCalculateManagedMemoryPerTaskManager();
+       }
+
        // 
------------------------------------------------------------------------
        //  utils
        // 
------------------------------------------------------------------------
 
+       public Configuration generateConfiguration() {
+               Configuration newConfiguration = new Configuration(config);
+               // set the memory
+               long memory = getOrCalculateManagedMemoryPerTaskManager();
+               
newConfiguration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memory);
+
+               return newConfiguration;
+       }
+
        @Override
        public String toString() {
                return "MiniClusterConfiguration {" +
@@ -162,4 +181,63 @@ public class MiniClusterConfiguration {
                                ", config=" + config +
                                '}';
        }
+
+       /**
+        * Get or calculate the managed memory per task manager. The memory is 
calculated in the
+        * following order:
+        *
+        * 1. Return {@link #managedMemoryPerTaskManager} if set
+        * 2. Return 
config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set
+        * 3. Distribute the available free memory equally among all components 
(JMs, RMs and TMs) and
+        * calculate the managed memory from the share of memory for a single 
task manager.
+        *
+        * @return
+        */
+       private long getOrCalculateManagedMemoryPerTaskManager() {
+               if (managedMemoryPerTaskManager == -1) {
+                       // no memory set in the mini cluster configuration
+                       final ConfigOption<Integer> memorySizeOption = 
ConfigOptions
+                               
.key(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)
+                               .defaultValue(-1);
+
+                       int memorySize = config.getInteger(memorySizeOption);
+
+                       if (memorySize == -1) {
+                               // no memory set in the flink configuration
+                               // share the available memory among all running 
components
+                               final ConfigOption<Integer> bufferSizeOption = 
ConfigOptions
+                                       
.key(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY)
+                                       
.defaultValue(ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+                               final ConfigOption<Long> bufferMemoryOption = 
ConfigOptions
+                                       
.key(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
+                                       .defaultValue((long) 
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+                               final ConfigOption<Float> memoryFractionOption 
= ConfigOptions
+                                       
.key(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY)
+                                       
.defaultValue(ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+
+                               float memoryFraction = 
config.getFloat(memoryFractionOption);
+                               long networkBuffersMemory = 
config.getLong(bufferMemoryOption) * config.getInteger(bufferSizeOption);
+
+                               long freeMemory = 
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+
+                               // we assign each component the same amount of 
free memory
+                               // (might be a bit of overkill for the JMs and 
RMs)
+                               long memoryPerComponent = freeMemory / 
(numTaskManagers + numResourceManagers + numJobManagers);
+
+                               // subtract the network buffer memory
+                               long memoryMinusNetworkBuffers = 
memoryPerComponent - networkBuffersMemory;
+
+                               // calculate the managed memory size
+                               long managedMemoryBytes = (long) 
(memoryMinusNetworkBuffers * memoryFraction);
+
+                               return managedMemoryBytes >>> 20;
+                       } else {
+                               return memorySize;
+                       }
+               } else {
+                       return managedMemoryPerTaskManager;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 6c7e249..56e72c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -59,7 +59,7 @@ public class JobLeaderIdService {
        /** Actions to call when the job leader changes */
        private JobLeaderIdActions jobLeaderIdActions;
 
-       public JobLeaderIdService(HighAvailabilityServices 
highAvailabilityServices) throws Exception {
+       public JobLeaderIdService(HighAvailabilityServices 
highAvailabilityServices) {
                this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
 
                this.runningJobsRegistry = 
highAvailabilityServices.getRunningJobsRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 7240087..a81c214 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -156,8 +156,9 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        throw new ResourceManagerException("Could not create 
the slot manager.", e);
                }
 
+               leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
+
                try {
-                       leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
                        leaderElectionService.start(this);
                } catch (Exception e) {
                        throw new ResourceManagerException("Could not start the 
leader election service.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
new file mode 100644
index 0000000..959b727
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link StandaloneResourceManager} runner. It instantiates the 
resource manager's services
+ * and handles fatal errors by shutting the resource manager down.
+ */
+public class ResourceManagerRunner implements FatalErrorHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceManagerRunner.class);
+
+       private final Object lock = new Object();
+
+       private final ResourceManager<?> resourceManager;
+
+       public ResourceManagerRunner(
+                       final Configuration configuration,
+                       final RpcService rpcService,
+                       final HighAvailabilityServices highAvailabilityServices,
+                       final MetricRegistry metricRegistry) throws 
ConfigurationException {
+
+               Preconditions.checkNotNull(configuration);
+               Preconditions.checkNotNull(rpcService);
+               Preconditions.checkNotNull(highAvailabilityServices);
+               Preconditions.checkNotNull(metricRegistry);
+
+               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
+               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
+               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(highAvailabilityServices);
+
+               this.resourceManager = new StandaloneResourceManager(
+                       rpcService,
+                       resourceManagerConfiguration,
+                       highAvailabilityServices,
+                       slotManagerFactory,
+                       metricRegistry,
+                       jobLeaderIdService,
+                       this);
+       }
+
+       
//-------------------------------------------------------------------------------------
+       // Lifecycle management
+       
//-------------------------------------------------------------------------------------
+
+       public void start() throws Exception {
+               resourceManager.start();
+       }
+
+       public void shutDown() throws Exception {
+               shutDownInternally();
+       }
+
+       private void shutDownInternally() throws Exception {
+               synchronized (lock) {
+                       resourceManager.shutDown();
+               }
+       }
+
+       
//-------------------------------------------------------------------------------------
+       // Fatal error handler
+       
//-------------------------------------------------------------------------------------
+
+       @Override
+       public void onFatalError(Throwable exception) {
+               LOG.error("Encountered fatal error.", exception);
+
+               try {
+                       shutDownInternally();
+               } catch (Exception e) {
+                       LOG.error("Could not properly shut down the resource 
manager.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
index f081fff..0007318 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.exceptions;
 
 /**
- * Base class for configuration related exception which occur when creating a 
configuration.
+ * Exception which occures when creating a configuration object fails.
  */
 public class ConfigurationException extends Exception {
        private static final long serialVersionUID = 3971647332059381556L;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
deleted file mode 100644
index 0c7e4e4..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.exceptions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple {@link StandaloneResourceManager} runner. It instantiates the 
resource manager's services
- * and handles fatal errors by shutting the resource manager down.
- */
-public class ResourceManagerRunner implements FatalErrorHandler {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceManagerRunner.class);
-
-       private final Object lock = new Object();
-
-       private final ResourceManager<?> resourceManager;
-
-       public ResourceManagerRunner(
-                       final Configuration configuration,
-                       final RpcService rpcService,
-                       final HighAvailabilityServices highAvailabilityServices,
-                       final MetricRegistry metricRegistry) throws Exception {
-
-               Preconditions.checkNotNull(configuration);
-               Preconditions.checkNotNull(rpcService);
-               Preconditions.checkNotNull(highAvailabilityServices);
-               Preconditions.checkNotNull(metricRegistry);
-
-               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
-               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
-               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(highAvailabilityServices);
-
-               this.resourceManager = new StandaloneResourceManager(
-                       rpcService,
-                       resourceManagerConfiguration,
-                       highAvailabilityServices,
-                       slotManagerFactory,
-                       metricRegistry,
-                       jobLeaderIdService,
-                       this);
-       }
-
-       
//-------------------------------------------------------------------------------------
-       // Lifecycle management
-       
//-------------------------------------------------------------------------------------
-
-       public void start() throws Exception {
-               resourceManager.start();
-       }
-
-       public void shutDown() throws Exception {
-               shutDownInternally();
-       }
-
-       private void shutDownInternally() throws Exception {
-               synchronized (lock) {
-                       resourceManager.shutDown();
-               }
-       }
-
-       
//-------------------------------------------------------------------------------------
-       // Fatal error handler
-       
//-------------------------------------------------------------------------------------
-
-       @Override
-       public void onFatalError(Throwable exception) {
-               LOG.error("Encountered fatal error.", exception);
-
-               try {
-                       shutDownInternally();
-               } catch (Exception e) {
-                       LOG.error("Could not properly shut down the resource 
manager.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8187fde..b981829 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -80,6 +80,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -217,47 +218,33 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
         * Called to shut down the TaskManager. The method closes all 
TaskManager services.
         */
        @Override
-       public void shutDown() {
+       public void shutDown() throws Exception {
                log.info("Stopping TaskManager {}.", getAddress());
 
+               Exception exception = null;
+
                taskSlotTable.stop();
 
                if (isConnectedToResourceManager()) {
-                       try {
-                               resourceManagerConnection.close();
-                       } catch (Exception e) {
-                               log.error("Could not cleanly close the 
ResourceManager connection.", e);
-                       }
+                       resourceManagerConnection.close();
                }
 
-               try {
-                       ioManager.shutdown();
-               } catch (Exception e) {
-                       log.error("IOManager did not shut down properly.", e);
-               }
+               ioManager.shutdown();
 
-               try {
-                       memoryManager.shutdown();
-               } catch (Exception e) {
-                       log.error("MemoryManager did not shut down properly.", 
e);
-               }
+               memoryManager.shutdown();
 
-               try {
-                       networkEnvironment.shutdown();
-               } catch (Exception e) {
-                       log.error("Network environment did not shut down 
properly.", e);
-               }
+               networkEnvironment.shutdown();
+
+               fileCache.shutdown();
 
                try {
-                       fileCache.shutdown();
+                       super.shutDown();
                } catch (Exception e) {
-                       log.error("File cache did not shut down properly.", e);
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
-               try {
-                       metricRegistry.shutdown();
-               } catch (Exception e) {
-                       log.error("MetricRegistry did not shut down properly.", 
e);
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(exception, "Error while 
shutting the TaskExecutor down.");
                }
 
                log.info("Stopped TaskManager {}.", getAddress());

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b22e04/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 7d9ee55..99a7c5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -24,9 +24,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 import org.apache.flink.util.Preconditions;
@@ -66,7 +69,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
                Configuration configuration,
                ResourceID resourceID,
                RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices) throws 
Exception {
+               HighAvailabilityServices highAvailabilityServices,
+               MetricRegistry metricRegistry) throws Exception {
 
                this.configuration = Preconditions.checkNotNull(configuration);
                this.resourceID = Preconditions.checkNotNull(resourceID);
@@ -81,10 +85,20 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                        remoteAddress,
                        false);
 
-               TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
resourceID);
+               TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
+                       taskManagerServicesConfiguration,
+                       resourceID);
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
+               TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
+                       metricRegistry,
+                       
taskManagerServices.getTaskManagerLocation().getHostname(),
+                       resourceID.toString());
+
+               // Initialize the TM metrics
+               
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
+
                this.taskManager = new TaskExecutor(
                        taskManagerConfiguration,
                        taskManagerServices.getTaskManagerLocation(),
@@ -93,8 +107,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
                        taskManagerServices.getIOManager(),
                        taskManagerServices.getNetworkEnvironment(),
                        highAvailabilityServices,
-                       taskManagerServices.getMetricRegistry(),
-                       taskManagerServices.getTaskManagerMetricGroup(),
+                       metricRegistry,
+                       taskManagerMetricGroup,
                        taskManagerServices.getBroadcastVariableManager(),
                        taskManagerServices.getFileCache(),
                        taskManagerServices.getTaskSlotTable(),
@@ -117,7 +131,11 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
 
        protected void shutDownInternally() {
                synchronized(lock) {
-                       taskManager.shutDown();
+                       try {
+                               taskManager.shutDown();
+                       } catch (Exception e) {
+                               LOG.error("Could not properly shut down the 
task manager.", e);
+                       }
                }
        }
 

Reply via email to