[FLINK-4505] [cluster mngt] Separate TaskManager service configuration from 
TaskManagerConfiguration; Implement TaskManagerRunner

Refactors the startup logic so that is easier to reuse.

This closes #2461.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb781aef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb781aef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb781aef

Branch: refs/heads/flip-6
Commit: bb781aef10b6c7099c83678a95c1d0db79cdbe3d
Parents: c34f13c
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Sep 21 12:33:15 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:41 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServicesUtils.java          |  41 +
 .../flink/runtime/rpc/RpcServiceUtils.java      |  73 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |   2 +
 .../runtime/taskexecutor/TaskExecutor.java      |  51 +-
 .../taskexecutor/TaskExecutorConfiguration.java | 142 ----
 .../taskexecutor/TaskManagerConfiguration.java  | 205 +++++
 .../runtime/taskexecutor/TaskManagerRunner.java | 172 +++++
 .../taskexecutor/TaskManagerServices.java       | 320 ++++++++
 .../TaskManagerServicesConfiguration.java       | 325 ++++++++
 .../runtime/taskmanager/TaskManagerRunner.java  | 749 -------------------
 .../runtime/util/LeaderRetrievalUtils.java      |   7 +
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   4 +
 .../NetworkEnvironmentConfiguration.scala       |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../io/network/NetworkEnvironmentTest.java      |   4 +-
 .../runtime/rpc/TestingSerialRpcService.java    |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java  |  29 +-
 ...askManagerComponentsStartupShutdownTest.java |   3 +-
 .../TaskManagerConfigurationTest.java           |   1 -
 19 files changed, 1195 insertions(+), 942 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
new file mode 100644
index 0000000..f3da847
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
+public class HighAvailabilityServicesUtils {
+
+       public static HighAvailabilityServices 
createHighAvailabilityServices(Configuration configuration) throws Exception {
+               HighAvailabilityMode highAvailabilityMode = 
LeaderRetrievalUtils.getRecoveryMode(configuration);
+
+               switch(highAvailabilityMode) {
+                       case NONE:
+                               final String resourceManagerAddress = null;
+                               return new 
NonHaServices(resourceManagerAddress);
+                       case ZOOKEEPER:
+                               throw new 
UnsupportedOperationException("ZooKeeper high availability services " +
+                                       "have not been implemented yet.");
+                       default:
+                               throw new Exception("Recovery mode " + 
highAvailabilityMode + " is not supported.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
new file mode 100644
index 0000000..d40e336
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.NetUtils;
+import org.jboss.netty.channel.ChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RpcServiceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(RpcServiceUtils.class);
+
+       /**
+        * Utility method to create RPC service from configuration and 
hostname, port.
+        *
+        * @param hostname   The hostname/address that describes the 
TaskManager's data location.
+        * @param port           If true, the TaskManager will not initiate the 
TCP network stack.
+        * @param configuration                 The configuration for the 
TaskManager.
+        * @return   The rpc service which is used to start and connect to the 
TaskManager RpcEndpoint .
+        * @throws IOException      Thrown, if the actor system can not bind to 
the address
+        * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
+        */
+       public static RpcService createRpcService(String hostname, int port, 
Configuration configuration) throws Exception {
+               LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.hostAndPortToUrlString(hostname, port));
+
+               final ActorSystem actorSystem;
+
+               try {
+                       Config akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, hostname, port);
+
+                       LOG.debug("Using akka configuration \n {}.", 
akkaConfig);
+
+                       actorSystem = AkkaUtils.createActorSystem(akkaConfig);
+               } catch (Throwable t) {
+                       if (t instanceof ChannelException) {
+                               Throwable cause = t.getCause();
+                               if (cause != null && t.getCause() instanceof 
java.net.BindException) {
+                                       String address = 
NetUtils.hostAndPortToUrlString(hostname, port);
+                                       throw new IOException("Unable to bind 
AkkaRpcService actor system to address " +
+                                               address + " - " + 
cause.getMessage(), t);
+                               }
+                       }
+                       throw new Exception("Could not create TaskManager actor 
system", t);
+               }
+
+               final Time timeout = 
Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+               return new AkkaRpcService(actorSystem, timeout);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 6825557..fb7896a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -93,6 +93,8 @@ public class AkkaRpcService implements RpcService {
 
                Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
 
+
+
                if (actorSystemAddress.host().isDefined()) {
                        address = actorSystemAddress.host().get();
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8ce2780..7df0a91 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,16 +18,14 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.jboss.netty.channel.ChannelException;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -39,7 +37,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.net.BindException;
+
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -60,7 +58,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        private final HighAvailabilityServices haServices;
 
        /** The task manager configuration */
-       private final TaskExecutorConfiguration taskExecutorConfig;
+       private final TaskManagerConfiguration taskManagerConfiguration;
 
        /** The I/O manager component in the task manager */
        private final IOManager ioManager;
@@ -71,9 +69,15 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /** The network component in the task manager */
        private final NetworkEnvironment networkEnvironment;
 
+       /** The metric registry in the task manager */
+       private final MetricRegistry metricRegistry;
+
        /** The number of slots in the task manager, should be 1 for YARN */
        private final int numberOfSlots;
 
+       /** The fatal error handler to use in case of a fatal error */
+       private final FatalErrorHandler fatalErrorHandler;
+
        // --------- resource manager --------
 
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
@@ -81,26 +85,30 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // 
------------------------------------------------------------------------
 
        public TaskExecutor(
-               TaskExecutorConfiguration taskExecutorConfig,
+               TaskManagerConfiguration taskManagerConfiguration,
                TaskManagerLocation taskManagerLocation,
                RpcService rpcService,
                MemoryManager memoryManager,
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,
-               HighAvailabilityServices haServices) {
+               HighAvailabilityServices haServices,
+               MetricRegistry metricRegistry,
+               FatalErrorHandler fatalErrorHandler) {
 
                super(rpcService);
 
-               checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The 
number of slots has to be larger than 0.");
+               checkArgument(taskManagerConfiguration.getNumberSlots() > 0, 
"The number of slots has to be larger than 0.");
 
-               this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
+               this.taskManagerConfiguration = 
checkNotNull(taskManagerConfiguration);
                this.taskManagerLocation = checkNotNull(taskManagerLocation);
                this.memoryManager = checkNotNull(memoryManager);
                this.ioManager = checkNotNull(ioManager);
                this.networkEnvironment = checkNotNull(networkEnvironment);
                this.haServices = checkNotNull(haServices);
+               this.metricRegistry = checkNotNull(metricRegistry);
+               this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
-               this.numberOfSlots =  taskExecutorConfig.getNumberOfSlots();
+               this.numberOfSlots =  taskManagerConfiguration.getNumberSlots();
        }
 
        // 
------------------------------------------------------------------------
@@ -158,6 +166,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       /**
         * Requests a slot from the TaskManager
         *
         * @param allocationID id for the request
@@ -169,22 +178,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                return new SlotRequestRegistered(allocationID);
        }
 
-       /**
-                       public LeaderRetrievalService 
getJobMasterLeaderRetriever(JobID jobID) throws Exception {
-                               return null;
-                       }
-
-                       @Override
-                               return null;
-                       }
-
-                       @Override
-                       public CheckpointRecoveryFactory 
getCheckpointRecoveryFactory() throws Exception {
-                               return null;
-                       }
-
-                       @Override
-                       public SubmittedJobGraphStore 
getSubmittedJobGraphStore() throws Exception {
        // 
------------------------------------------------------------------------
        //  Properties
        // 
------------------------------------------------------------------------
@@ -222,7 +215,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        void onFatalError(Throwable t) {
                // to be determined, probably delegate to a fatal error handler 
that 
                // would either log (mini cluster) ot kill the process (yarn, 
mesos, ...)
-               log.error("FATAL ERROR", t);
+               fatalErrorHandler.onFatalError(t);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
deleted file mode 100644
index c97c893..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * {@link TaskExecutor} Configuration
- */
-public class TaskExecutorConfiguration implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private final String[] tmpDirPaths;
-
-       private final long cleanupInterval;
-
-       private final int numberOfSlots;
-
-       private final Configuration configuration;
-
-       private final FiniteDuration timeout;
-       private final FiniteDuration maxRegistrationDuration;
-       private final FiniteDuration initialRegistrationPause;
-       private final FiniteDuration maxRegistrationPause;
-       private final FiniteDuration refusedRegistrationPause;
-
-       private final NetworkEnvironmentConfiguration networkConfig;
-
-       public TaskExecutorConfiguration(
-                       String[] tmpDirPaths,
-                       long cleanupInterval,
-                       NetworkEnvironmentConfiguration networkConfig,
-                       FiniteDuration timeout,
-                       FiniteDuration maxRegistrationDuration,
-                       int numberOfSlots,
-                       Configuration configuration) {
-
-               this (tmpDirPaths,
-                       cleanupInterval,
-                       networkConfig,
-                       timeout,
-                       maxRegistrationDuration,
-                       numberOfSlots,
-                       configuration,
-                       new FiniteDuration(500, TimeUnit.MILLISECONDS),
-                       new FiniteDuration(30, TimeUnit.SECONDS),
-                       new FiniteDuration(10, TimeUnit.SECONDS));
-       }
-
-       public TaskExecutorConfiguration(
-                       String[] tmpDirPaths,
-                       long cleanupInterval,
-                       NetworkEnvironmentConfiguration networkConfig,
-                       FiniteDuration timeout,
-                       FiniteDuration maxRegistrationDuration,
-                       int numberOfSlots,
-                       Configuration configuration,
-                       FiniteDuration initialRegistrationPause,
-                       FiniteDuration maxRegistrationPause,
-                       FiniteDuration refusedRegistrationPause) {
-
-               this.tmpDirPaths = checkNotNull(tmpDirPaths);
-               this.cleanupInterval = checkNotNull(cleanupInterval);
-               this.networkConfig = checkNotNull(networkConfig);
-               this.timeout = checkNotNull(timeout);
-               this.maxRegistrationDuration = maxRegistrationDuration;
-               this.numberOfSlots = checkNotNull(numberOfSlots);
-               this.configuration = checkNotNull(configuration);
-               this.initialRegistrationPause = 
checkNotNull(initialRegistrationPause);
-               this.maxRegistrationPause = checkNotNull(maxRegistrationPause);
-               this.refusedRegistrationPause = 
checkNotNull(refusedRegistrationPause);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Properties
-       // 
--------------------------------------------------------------------------------------------
-
-       public String[] getTmpDirPaths() {
-               return tmpDirPaths;
-       }
-
-       public long getCleanupInterval() {
-               return cleanupInterval;
-       }
-
-       public NetworkEnvironmentConfiguration getNetworkConfig() { return 
networkConfig; }
-
-       public FiniteDuration getTimeout() {
-               return timeout;
-       }
-
-       public FiniteDuration getMaxRegistrationDuration() {
-               return maxRegistrationDuration;
-       }
-
-       public int getNumberOfSlots() {
-               return numberOfSlots;
-       }
-
-       public Configuration getConfiguration() {
-               return configuration;
-       }
-
-       public FiniteDuration getInitialRegistrationPause() {
-               return initialRegistrationPause;
-       }
-
-       public FiniteDuration getMaxRegistrationPause() {
-               return maxRegistrationPause;
-       }
-
-       public FiniteDuration getRefusedRegistrationPause() {
-               return refusedRegistrationPause;
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
new file mode 100644
index 0000000..32eb8c1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.io.File;
+
+/**
+ * Configuration object for {@link TaskExecutor}.
+ */
+public class TaskManagerConfiguration {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerConfiguration.class);
+
+       private final int numberSlots;
+
+       private final String[] tmpDirPaths;
+
+       private final Time timeout;
+       private final Time maxRegistrationDuration;
+       private final Time initialRegistrationPause;
+       private final Time maxRegistrationPause;
+       private final Time refusedRegistrationPause;
+
+       private final long cleanupInterval;
+
+       public TaskManagerConfiguration(
+               int numberSlots,
+               String[] tmpDirPaths,
+               Time timeout,
+               Time maxRegistrationDuration,
+               Time initialRegistrationPause,
+               Time maxRegistrationPause,
+               Time refusedRegistrationPause,
+               long cleanupInterval) {
+
+               this.numberSlots = numberSlots;
+               this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
+               this.timeout = Preconditions.checkNotNull(timeout);
+               this.maxRegistrationDuration = 
Preconditions.checkNotNull(maxRegistrationDuration);
+               this.initialRegistrationPause = 
Preconditions.checkNotNull(initialRegistrationPause);
+               this.maxRegistrationPause = 
Preconditions.checkNotNull(maxRegistrationPause);
+               this.refusedRegistrationPause = 
Preconditions.checkNotNull(refusedRegistrationPause);
+               this.cleanupInterval = 
Preconditions.checkNotNull(cleanupInterval);
+       }
+
+       public int getNumberSlots() {
+               return numberSlots;
+       }
+
+       public String[] getTmpDirPaths() {
+               return tmpDirPaths;
+       }
+
+       public Time getTimeout() {
+               return timeout;
+       }
+
+       public Time getMaxRegistrationDuration() {
+               return maxRegistrationDuration;
+       }
+
+       public Time getInitialRegistrationPause() {
+               return initialRegistrationPause;
+       }
+
+       public Time getMaxRegistrationPause() {
+               return maxRegistrationPause;
+       }
+
+       public Time getRefusedRegistrationPause() {
+               return refusedRegistrationPause;
+       }
+
+       public long getCleanupInterval() {
+               return cleanupInterval;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Static factory methods
+       // 
--------------------------------------------------------------------------------------------
+
+       public static TaskManagerConfiguration fromConfiguration(Configuration 
configuration) {
+               int numberSlots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+               if (numberSlots == -1) {
+                       numberSlots = 1;
+               }
+
+               final String[] tmpDirPaths = configuration.getString(
+                       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+               final Time timeout;
+
+               try {
+                       timeout = 
Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+               } catch (Exception e) {
+                       throw new IllegalArgumentException(
+                               "Invalid format for '" + 
ConfigConstants.AKKA_ASK_TIMEOUT +
+                                       "'.Use formats like '50 s' or '1 min' 
to specify the timeout.");
+               }
+
+               LOG.info("Messages have a max timeout of " + timeout);
+
+               final long cleanupInterval = configuration.getLong(
+                       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+                       
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+               final Time finiteRegistrationDuration;
+
+               try {
+                       Duration maxRegistrationDuration = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+                       if (maxRegistrationDuration.isFinite()) {
+                               finiteRegistrationDuration = 
Time.seconds(maxRegistrationDuration.toSeconds());
+                       } else {
+                               finiteRegistrationDuration = null;
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+               }
+
+               final Time initialRegistrationPause;
+               try {
+                       Duration pause = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+                       if (pause.isFinite()) {
+                               initialRegistrationPause = 
Time.seconds(pause.toSeconds());
+                       } else {
+                               throw new IllegalArgumentException("The initial 
registration pause must be finite: " + pause);
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+               }
+
+               final Time maxRegistrationPause;
+               try {
+                       Duration pause = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+                       if (pause.isFinite()) {
+                               maxRegistrationPause = 
Time.seconds(pause.toSeconds());
+                       } else {
+                               throw new IllegalArgumentException("The maximum 
registration pause must be finite: " + pause);
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+               }
+
+               final Time refusedRegistrationPause;
+               try {
+                       Duration pause = 
Duration.create(configuration.getString(
+                               
ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+                       if (pause.isFinite()) {
+                               refusedRegistrationPause = 
Time.seconds(pause.toSeconds());
+                       } else {
+                               throw new IllegalArgumentException("The refused 
registration pause must be finite: " + pause);
+                       }
+               } catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Invalid format for 
parameter " +
+                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+               }
+
+               return new TaskManagerConfiguration(
+                       numberSlots,
+                       tmpDirPaths,
+                       timeout,
+                       finiteRegistrationDuration,
+                       initialRegistrationPause,
+                       maxRegistrationPause,
+                       refusedRegistrationPause,
+                       cleanupInterval);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
new file mode 100644
index 0000000..8ac0ddd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is the executable entry point for the task manager in yarn or 
standalone mode.
+ * It constructs the related components (network, I/O manager, memory manager, 
RPC service, HA service)
+ * and starts them.
+ */
+public class TaskManagerRunner implements FatalErrorHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerRunner.class);
+
+       private final Object lock = new Object();
+
+       private final Configuration configuration;
+
+       private final ResourceID resourceID;
+
+       private final RpcService rpcService;
+
+       private final HighAvailabilityServices highAvailabilityServices;
+
+       /** Executor used to run future callbacks */
+       private final Executor executor;
+
+       private final TaskExecutor taskManager;
+
+       public TaskManagerRunner(
+               Configuration configuration,
+               ResourceID resourceID,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               Executor executor) throws Exception {
+
+               this.configuration = Preconditions.checkNotNull(configuration);
+               this.resourceID = Preconditions.checkNotNull(resourceID);
+               this.rpcService = Preconditions.checkNotNull(rpcService);
+               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
+               this.executor = rpcService.getExecutor();
+
+               InetAddress remoteAddress = 
InetAddress.getByName(rpcService.getAddress());
+
+               TaskManagerServicesConfiguration 
taskManagerServicesConfiguration = 
TaskManagerServicesConfiguration.fromConfiguration(
+                       configuration,
+                       remoteAddress,
+                       false);
+
+               TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
resourceID);
+
+               TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+
+               this.taskManager = new TaskExecutor(
+                       taskManagerConfiguration,
+                       taskManagerServices.getTaskManagerLocation(),
+                       rpcService,
+                       taskManagerServices.getMemoryManager(),
+                       taskManagerServices.getIOManager(),
+                       taskManagerServices.getNetworkEnvironment(),
+                       highAvailabilityServices,
+                       taskManagerServices.getMetricRegistry(),
+                       this);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Lifecycle management
+       // 
--------------------------------------------------------------------------------------------
+
+       public void start() {
+               taskManager.start();
+       }
+
+       public void shutDown(Throwable cause) {
+               shutDownInternally();
+       }
+
+       protected void shutDownInternally() {
+               synchronized(lock) {
+                       taskManager.shutDown();
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  FatalErrorHandler methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void onFatalError(Throwable exception) {
+               LOG.error("Fatal error occurred while executing the 
TaskManager. Shutting it down...", exception);
+               shutDown(exception);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Static utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Create a RPC service for the task manager.
+        *
+        * @param configuration The configuration for the TaskManager.
+        * @param haServices to use for the task manager hostname retrieval
+        */
+       public static RpcService createRpcService(
+               final Configuration configuration,
+               final HighAvailabilityServices haServices) throws Exception {
+
+               checkNotNull(configuration);
+               checkNotNull(haServices);
+
+               String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+
+               if (taskManagerHostname != null) {
+                       LOG.info("Using configured hostname/address for 
TaskManager: {}.", taskManagerHostname);
+               } else {
+                       Time lookupTimeout = 
Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
+
+                       InetAddress taskManagerAddress = 
LeaderRetrievalUtils.findConnectingAddress(
+                               haServices.getResourceManagerLeaderRetriever(),
+                               lookupTimeout);
+
+                       taskManagerHostname = taskManagerAddress.getHostName();
+
+                       LOG.info("TaskManager will use hostname/address '{}' 
({}) for communication.",
+                               taskManagerHostname, 
taskManagerAddress.getHostAddress());
+               }
+
+               final int rpcPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+
+               Preconditions.checkState(rpcPort < 0 || rpcPort >65535, 
"Invalid value for " +
+                               "'%s' (port for the TaskManager actor system) : 
%d - Leave config parameter empty or " +
+                               "use 0 to let the system choose port 
automatically.",
+                       ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+
+               return RpcServiceUtils.createRpcService(taskManagerHostname, 
rpcPort, configuration);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
new file mode 100644
index 0000000..ff7f7d5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Container for {@link TaskExecutor} services such as the {@link 
MemoryManager}, {@link IOManager},
+ * {@link NetworkEnvironment} and the {@link MetricRegistry}.
+ */
+public class TaskManagerServices {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServices.class);
+
+       /** TaskManager services */
+       private final TaskManagerLocation taskManagerLocation;
+       private final MemoryManager memoryManager;
+       private final IOManager ioManager;
+       private final NetworkEnvironment networkEnvironment;
+       private final MetricRegistry metricRegistry;
+
+       private TaskManagerServices(
+               TaskManagerLocation taskManagerLocation,
+               MemoryManager memoryManager,
+               IOManager ioManager,
+               NetworkEnvironment networkEnvironment,
+               MetricRegistry metricRegistry) {
+
+               this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
+               this.memoryManager = Preconditions.checkNotNull(memoryManager);
+               this.ioManager = Preconditions.checkNotNull(ioManager);
+               this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
+               this.metricRegistry = 
Preconditions.checkNotNull(metricRegistry);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Getter/Setter
+       // 
--------------------------------------------------------------------------------------------
+
+       public MemoryManager getMemoryManager() {
+               return memoryManager;
+       }
+
+       public IOManager getIOManager() {
+               return ioManager;
+       }
+
+       public NetworkEnvironment getNetworkEnvironment() {
+               return networkEnvironment;
+       }
+
+       public TaskManagerLocation getTaskManagerLocation() {
+               return taskManagerLocation;
+       }
+
+       public MetricRegistry getMetricRegistry() {
+               return metricRegistry;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Static factory methods for task manager services
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Creates and returns the task manager services.
+        *
+        * @param resourceID resource ID of the task manager
+        * @param taskManagerServicesConfiguration task manager configuration
+        * @return task manager components
+        * @throws Exception
+        */
+       public static TaskManagerServices fromConfiguration(
+               TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
+               ResourceID resourceID) throws Exception {
+
+               final NetworkEnvironment network = 
createNetworkEnvironment(taskManagerServicesConfiguration);
+
+               network.start();
+
+               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(
+                       resourceID,
+                       
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                       network.getConnectionManager().getDataPort());
+
+               // this call has to happen strictly after the network stack has 
been initialized
+               final MemoryManager memoryManager = 
createMemoryManager(taskManagerServicesConfiguration);
+
+               // start the I/O manager, it will create some temp directories.
+               final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
+
+               MetricRegistry metricsRegistry = new 
MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
+
+               return new TaskManagerServices(taskManagerLocation, 
memoryManager, ioManager, network, metricsRegistry);
+       }
+
+       /**
+        * Creates a {@link MemoryManager} from the given {@link 
TaskManagerServicesConfiguration}.
+        *
+        * @param taskManagerServicesConfiguration to create the memory manager 
from
+        * @return Memory manager
+        * @throws Exception
+        */
+       private static MemoryManager 
createMemoryManager(TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) throws Exception {
+               // computing the amount of memory to use depends on how much 
memory is available
+               // it strictly needs to happen AFTER the network stack has been 
initialized
+
+               MemoryType memType = 
taskManagerServicesConfiguration.getNetworkConfig().memoryType();
+
+               // check if a value has been configured
+               long configuredMemory = 
taskManagerServicesConfiguration.getConfiguredMemory();
+
+               final long memorySize;
+
+               boolean preAllocateMemory = 
taskManagerServicesConfiguration.isPreAllocateMemory();
+
+               if (configuredMemory > 0) {
+                       if (preAllocateMemory) {
+                               LOG.info("Using {} MB for managed memory." , 
configuredMemory);
+                       } else {
+                               LOG.info("Limiting managed memory to {} MB, 
memory will be allocated lazily." , configuredMemory);
+                       }
+                       memorySize = configuredMemory << 20; // megabytes to 
bytes
+               } else {
+                       float memoryFraction = 
taskManagerServicesConfiguration.getMemoryFraction();
+
+                       if (memType == MemoryType.HEAP) {
+                               long relativeMemSize = (long) 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
+                               if (preAllocateMemory) {
+                                       LOG.info("Using {} of the currently 
free heap space for managed heap memory ({} MB)." ,
+                                               memoryFraction , 
relativeMemSize >> 20);
+                               } else {
+                                       LOG.info("Limiting managed memory to {} 
of the currently free heap space ({} MB), " +
+                                               "memory will be allocated 
lazily." , memoryFraction , relativeMemSize >> 20);
+                               }
+                               memorySize = relativeMemSize;
+                       } else if (memType == MemoryType.OFF_HEAP) {
+                               // The maximum heap memory has been adjusted 
according to the fraction
+                               long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
+                               long directMemorySize = (long) (maxMemory / 
(1.0 - memoryFraction) * memoryFraction);
+                               if (preAllocateMemory) {
+                                       LOG.info("Using {} of the maximum 
memory size for managed off-heap memory ({} MB)." ,
+                                               memoryFraction, 
directMemorySize >> 20);
+                               } else {
+                                       LOG.info("Limiting managed memory to {} 
of the maximum memory size ({} MB)," +
+                                               " memory will be allocated 
lazily.", memoryFraction, directMemorySize >> 20);
+                               }
+                               memorySize = directMemorySize;
+                       } else {
+                               throw new RuntimeException("No supported memory 
type detected.");
+                       }
+               }
+
+               // now start the memory manager
+               final MemoryManager memoryManager;
+               try {
+                       memoryManager = new MemoryManager(
+                               memorySize,
+                               
taskManagerServicesConfiguration.getNumberOfSlots(),
+                               
taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
+                               memType,
+                               preAllocateMemory);
+               } catch (OutOfMemoryError e) {
+                       if (memType == MemoryType.HEAP) {
+                               throw new Exception("OutOfMemory error (" + 
e.getMessage() +
+                                       ") while allocating the TaskManager 
heap memory (" + memorySize + " bytes).", e);
+                       } else if (memType == MemoryType.OFF_HEAP) {
+                               throw new Exception("OutOfMemory error (" + 
e.getMessage() +
+                                       ") while allocating the TaskManager 
off-heap memory (" + memorySize +
+                                       " bytes).Try increasing the maximum 
direct memory (-XX:MaxDirectMemorySize)", e);
+                       } else {
+                               throw e;
+                       }
+               }
+               return memoryManager;
+       }
+
+       /**
+        * Creates the {@link NetworkEnvironment} from the given {@link 
TaskManagerServicesConfiguration}.
+        *
+        * @param taskManagerServicesConfiguration to construct the network 
environment from
+        * @return Network environment
+        * @throws IOException
+        */
+       private static NetworkEnvironment 
createNetworkEnvironment(TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) throws IOException {
+               // pre-start checks
+               
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
+
+               NetworkEnvironmentConfiguration networkEnvironmentConfiguration 
= taskManagerServicesConfiguration.getNetworkConfig();
+
+               NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+                       networkEnvironmentConfiguration.numNetworkBuffers(),
+                       networkEnvironmentConfiguration.networkBufferSize(),
+                       networkEnvironmentConfiguration.memoryType());
+
+               ConnectionManager connectionManager;
+
+               if (networkEnvironmentConfiguration.nettyConfig() != null) {
+                       connectionManager = new 
NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig());
+               } else {
+                       connectionManager = new LocalConnectionManager();
+               }
+
+               ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
+
+               KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+               KvStateServer kvStateServer;
+
+               if (networkEnvironmentConfiguration.nettyConfig() != null) {
+                       NettyConfig nettyConfig = 
networkEnvironmentConfiguration.nettyConfig();
+
+                       int numNetworkThreads = 
networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+                               nettyConfig.getNumberOfSlots() : 
networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+                       int numQueryThreads = 
networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+                               nettyConfig.getNumberOfSlots() : 
networkEnvironmentConfiguration.queryServerQueryThreads();
+
+                       kvStateServer = new KvStateServer(
+                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                               
networkEnvironmentConfiguration.queryServerPort(),
+                               numNetworkThreads,
+                               numQueryThreads,
+                               kvStateRegistry,
+                               new DisabledKvStateRequestStats());
+               } else {
+                       kvStateServer = null;
+               }
+
+               // we start the network first, to make sure it can allocate its 
buffers first
+               final NetworkEnvironment network = new NetworkEnvironment(
+                       networkBufferPool,
+                       connectionManager,
+                       resultPartitionManager,
+                       taskEventDispatcher,
+                       kvStateRegistry,
+                       kvStateServer,
+                       networkEnvironmentConfiguration.ioMode(),
+                       
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+                       
networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+               return network;
+       }
+
+       /**
+        * Validates that all the directories denoted by the strings do 
actually exist, are proper
+        * directories (not files), and are writable.
+        *
+        * @param tmpDirs       The array of directory paths to check.
+        * @throws Exception    Thrown if any of the directories does not exist 
or is not writable
+        *                   or is a file, rather than a directory.
+        */
+       private static void checkTempDirs(String[] tmpDirs) throws IOException {
+               for (String dir : tmpDirs) {
+                       if (dir != null && !dir.equals("")) {
+                               File file = new File(dir);
+                               if (!file.exists()) {
+                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " does not exist.");
+                               }
+                               if (!file.isDirectory()) {
+                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " is not a directory.");
+                               }
+                               if (!file.canWrite()) {
+                                       throw new IOException("Temporary file 
directory " + file.getAbsolutePath() + " is not writable.");
+                               }
+
+                               if (LOG.isInfoEnabled()) {
+                                       long totalSpaceGb = 
file.getTotalSpace() >> 30;
+                                       long usableSpaceGb = 
file.getUsableSpace() >> 30;
+                                       double usablePercentage = 
(double)usableSpaceGb / totalSpaceGb * 100;
+                                       String path = file.getAbsolutePath();
+                                       LOG.info(String.format("Temporary file 
directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+                                               path, totalSpaceGb, 
usableSpaceGb, usablePercentage));
+                               }
+                       } else {
+                               throw new IllegalArgumentException("Temporary 
file directory #$id is null.");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb781aef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
new file mode 100644
index 0000000..66d969a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.util.MathUtils;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Configuration for the task manager services such as the network 
environment, the memory manager,
+ * the io manager and the metric registry
+ */
+public class TaskManagerServicesConfiguration {
+
+       private final InetAddress taskManagerAddress;
+
+       private final String[] tmpDirPaths;
+
+       private final int numberOfSlots;
+
+       private final NetworkEnvironmentConfiguration networkConfig;
+
+       private final long configuredMemory;
+
+       private final boolean preAllocateMemory;
+
+       private final float memoryFraction;
+
+       private final MetricRegistryConfiguration metricRegistryConfiguration;
+
+       public TaskManagerServicesConfiguration(
+               InetAddress taskManagerAddress,
+               String[] tmpDirPaths,
+               NetworkEnvironmentConfiguration networkConfig,
+               int numberOfSlots,
+               long configuredMemory,
+               boolean preAllocateMemory,
+               float memoryFraction,
+               MetricRegistryConfiguration metricRegistryConfiguration) {
+
+               this.taskManagerAddress = checkNotNull(taskManagerAddress);
+               this.tmpDirPaths = checkNotNull(tmpDirPaths);
+               this.networkConfig = checkNotNull(networkConfig);
+               this.numberOfSlots = checkNotNull(numberOfSlots);
+
+               this.configuredMemory = configuredMemory;
+               this.preAllocateMemory = preAllocateMemory;
+               this.memoryFraction = memoryFraction;
+
+               this.metricRegistryConfiguration = 
checkNotNull(metricRegistryConfiguration);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Getter/Setter
+       // 
--------------------------------------------------------------------------------------------
+
+
+       public InetAddress getTaskManagerAddress() {
+               return taskManagerAddress;
+       }
+
+       public String[] getTmpDirPaths() {
+               return tmpDirPaths;
+       }
+
+       public NetworkEnvironmentConfiguration getNetworkConfig() { return 
networkConfig; }
+
+       public int getNumberOfSlots() {
+               return numberOfSlots;
+       }
+
+       public float getMemoryFraction() {
+               return memoryFraction;
+       }
+
+       public long getConfiguredMemory() {
+               return configuredMemory;
+       }
+
+       public boolean isPreAllocateMemory() {
+               return preAllocateMemory;
+       }
+
+       public MetricRegistryConfiguration getMetricRegistryConfiguration() {
+               return metricRegistryConfiguration;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Parsing of Flink configuration
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Utility method to extract TaskManager config parameters from the 
configuration and to
+        * sanity check them.
+        *
+        * @param configuration The configuration.
+        * @param remoteAddress identifying the IP address under which the 
TaskManager will be accessible
+        * @param localCommunication True, to skip initializing the network 
stack.
+        *                                      Use only in cases where only 
one task manager runs.
+        * @return TaskExecutorConfiguration that wrappers 
InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+        */
+       public static TaskManagerServicesConfiguration fromConfiguration(
+               Configuration configuration,
+               InetAddress remoteAddress,
+               boolean localCommunication) throws Exception {
+
+               // we need this because many configs have been written with a 
"-1" entry
+               int slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+               if (slots == -1) {
+                       slots = 1;
+               }
+
+               final String[] tmpDirs = configuration.getString(
+                       ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+               final NetworkEnvironmentConfiguration networkConfig = 
parseNetworkEnvironmentConfiguration(
+                       configuration,
+                       localCommunication,
+                       remoteAddress,
+                       slots);
+
+               // extract memory settings
+               long configuredMemory = 
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+               checkConfigParameter(configuredMemory == -1 || configuredMemory 
> 0, configuredMemory,
+                       ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+                       "MemoryManager needs at least one MB of memory. " +
+                               "If you leave this config parameter empty, the 
system automatically " +
+                               "pick a fraction of the available memory.");
+
+               boolean preAllocateMemory = configuration.getBoolean(
+                       ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+
+               float memoryFraction = configuration.getFloat(
+                       ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+                       ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+               checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 
1.0f, memoryFraction,
+                       ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+                       "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0");
+
+               final MetricRegistryConfiguration metricRegistryConfiguration = 
MetricRegistryConfiguration.fromConfiguration(configuration);
+
+               return new TaskManagerServicesConfiguration(
+                       remoteAddress,
+                       tmpDirs,
+                       networkConfig,
+                       slots,
+                       configuredMemory,
+                       preAllocateMemory,
+                       memoryFraction,
+                       metricRegistryConfiguration);
+       }
+
+       // 
--------------------------------------------------------------------------
+       //  Parsing and checking the TaskManager Configuration
+       // 
--------------------------------------------------------------------------
+
+       /**
+        * Creates the {@link NetworkEnvironmentConfiguration} from the given 
{@link Configuration}.
+        *
+        * @param configuration to create the network environment configuration 
from
+        * @param localTaskManagerCommunication true if task manager 
communication is local
+        * @param taskManagerAddress address of the task manager
+        * @param slots to start the task manager with
+        * @return Network environment configuration
+        */
+       private static NetworkEnvironmentConfiguration 
parseNetworkEnvironmentConfiguration(
+               Configuration configuration,
+               boolean localTaskManagerCommunication,
+               InetAddress taskManagerAddress,
+               int slots) throws Exception {
+
+               // ----> hosts / ports for communication and data exchange
+
+               int dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+                       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+
+               checkConfigParameter(dataport > 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+                       "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+               checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+                       "Number of task slots must be at least one.");
+
+               final int numNetworkBuffers = configuration.getInteger(
+                       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+               checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+                       ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 
"");
+
+               final int pageSize = configuration.getInteger(
+                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+               // check page size of for minimum size
+               checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, 
pageSize,
+                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+                       "Minimum memory segment size is " + 
MemoryManager.MIN_PAGE_SIZE);
+
+               // check page size for power of two
+               checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+                       ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+                       "Memory segment size must be a power of 2.");
+
+               // check whether we use heap or off-heap memory
+               final MemoryType memType;
+               if 
(configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, 
false)) {
+                       memType = MemoryType.OFF_HEAP;
+               } else {
+                       memType = MemoryType.HEAP;
+               }
+
+               // initialize the memory segment factory accordingly
+               if (memType == MemoryType.HEAP) {
+                       if 
(!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+                               throw new Exception("Memory type is set to heap 
memory, but memory segment " +
+                                       "factory has been initialized for 
off-heap memory segments");
+                       }
+               } else {
+                       if 
(!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) 
{
+                               throw new Exception("Memory type is set to 
off-heap memory, but memory segment " +
+                                       "factory has been initialized for heap 
memory segments");
+                       }
+               }
+
+               final NettyConfig nettyConfig;
+               if (!localTaskManagerCommunication) {
+                       final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+                       nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(),
+                               taskManagerInetSocketAddress.getPort(), 
pageSize, slots, configuration);
+               } else {
+                       nettyConfig = null;
+               }
+
+               // Default spill I/O mode for intermediate results
+               final String syncOrAsync = configuration.getString(
+                       ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+                       
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+               final IOManager.IOMode ioMode;
+               if (syncOrAsync.equals("async")) {
+                       ioMode = IOManager.IOMode.ASYNC;
+               } else {
+                       ioMode = IOManager.IOMode.SYNC;
+               }
+
+               final int queryServerPort =  configuration.getInteger(
+                       ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+                       ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+               final int queryServerNetworkThreads =  configuration.getInteger(
+                       ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+                       
ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+               final int queryServerQueryThreads =  configuration.getInteger(
+                       ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+                       
ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+               return new NetworkEnvironmentConfiguration(
+                       numNetworkBuffers,
+                       pageSize,
+                       memType,
+                       ioMode,
+                       queryServerPort,
+                       queryServerNetworkThreads,
+                       queryServerQueryThreads,
+                       nettyConfig,
+                       500,
+                       3000);
+       }
+
+       /**
+        * Validates a condition for a config parameter and displays a standard 
exception, if the
+        * the condition does not hold.
+        *
+        * @param condition             The condition that must hold. If the 
condition is false, an exception is thrown.
+        * @param parameter         The parameter value. Will be shown in the 
exception message.
+        * @param name              The name of the config parameter. Will be 
shown in the exception message.
+        * @param errorMessage  The optional custom error message to append to 
the exception message.
+        */
+       private static void checkConfigParameter(
+               boolean condition,
+               Object parameter,
+               String name,
+               String errorMessage) {
+               if (!condition) {
+                       throw new IllegalConfigurationException("Invalid 
configuration value for " + name + " : " + parameter + " - " + errorMessage);
+               }
+       }
+}
+

Reply via email to