[hotfix] Replace TaskManager.createTaskManagerComponents by TaskManagerServices


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bce292ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bce292ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bce292ae

Branch: refs/heads/flip-6
Commit: bce292ae9eb15ba07598e26c96634c7eee45db9d
Parents: 1f198d8
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Sep 28 14:04:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosTaskManager.scala     |   3 +-
 .../taskexecutor/TaskManagerConfiguration.java  |  25 +-
 .../TaskManagerServicesConfiguration.java       |   2 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  47 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 605 ++-----------------
 .../taskmanager/TaskManagerConfiguration.scala  |  56 --
 ...askManagerComponentsStartupShutdownTest.java |  24 +-
 .../testingUtils/TestingTaskManager.scala       |   3 +-
 .../runtime/testingUtils/TestingUtils.scala     |   1 -
 .../flink/yarn/TestingYarnTaskManager.scala     |   3 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   3 +-
 11 files changed, 126 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index 3972a57..e8d6a58 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional Mesos-related
   * messages.

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 32eb8c1..f58af77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -41,6 +41,7 @@ public class TaskManagerConfiguration {
        private final String[] tmpDirPaths;
 
        private final Time timeout;
+       // null indicates an infinite duration
        private final Time maxRegistrationDuration;
        private final Time initialRegistrationPause;
        private final Time maxRegistrationPause;
@@ -48,6 +49,9 @@ public class TaskManagerConfiguration {
 
        private final long cleanupInterval;
 
+       // TODO: remove necessity for complete configuration object
+       private final Configuration configuration;
+
        public TaskManagerConfiguration(
                int numberSlots,
                String[] tmpDirPaths,
@@ -56,16 +60,18 @@ public class TaskManagerConfiguration {
                Time initialRegistrationPause,
                Time maxRegistrationPause,
                Time refusedRegistrationPause,
-               long cleanupInterval) {
+               long cleanupInterval,
+               Configuration configuration) {
 
                this.numberSlots = numberSlots;
                this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
                this.timeout = Preconditions.checkNotNull(timeout);
-               this.maxRegistrationDuration = 
Preconditions.checkNotNull(maxRegistrationDuration);
+               this.maxRegistrationDuration = maxRegistrationDuration;
                this.initialRegistrationPause = 
Preconditions.checkNotNull(initialRegistrationPause);
                this.maxRegistrationPause = 
Preconditions.checkNotNull(maxRegistrationPause);
                this.refusedRegistrationPause = 
Preconditions.checkNotNull(refusedRegistrationPause);
                this.cleanupInterval = 
Preconditions.checkNotNull(cleanupInterval);
+               this.configuration = Preconditions.checkNotNull(configuration);
        }
 
        public int getNumberSlots() {
@@ -100,6 +106,10 @@ public class TaskManagerConfiguration {
                return cleanupInterval;
        }
 
+       public Configuration getConfiguration() {
+               return configuration;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Static factory methods
        // 
--------------------------------------------------------------------------------------------
@@ -138,7 +148,7 @@ public class TaskManagerConfiguration {
                                
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
                                
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
                        if (maxRegistrationDuration.isFinite()) {
-                               finiteRegistrationDuration = 
Time.seconds(maxRegistrationDuration.toSeconds());
+                               finiteRegistrationDuration = 
Time.milliseconds(maxRegistrationDuration.toMillis());
                        } else {
                                finiteRegistrationDuration = null;
                        }
@@ -153,7 +163,7 @@ public class TaskManagerConfiguration {
                                
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
                                
ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
                        if (pause.isFinite()) {
-                               initialRegistrationPause = 
Time.seconds(pause.toSeconds());
+                               initialRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
                                throw new IllegalArgumentException("The initial 
registration pause must be finite: " + pause);
                        }
@@ -168,7 +178,7 @@ public class TaskManagerConfiguration {
                                
ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
                                
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
                        if (pause.isFinite()) {
-                               maxRegistrationPause = 
Time.seconds(pause.toSeconds());
+                               maxRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
                                throw new IllegalArgumentException("The maximum 
registration pause must be finite: " + pause);
                        }
@@ -183,7 +193,7 @@ public class TaskManagerConfiguration {
                                
ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
                                
ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
                        if (pause.isFinite()) {
-                               refusedRegistrationPause = 
Time.seconds(pause.toSeconds());
+                               refusedRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
                                throw new IllegalArgumentException("The refused 
registration pause must be finite: " + pause);
                        }
@@ -200,6 +210,7 @@ public class TaskManagerConfiguration {
                        initialRegistrationPause,
                        maxRegistrationPause,
                        refusedRegistrationPause,
-                       cleanupInterval);
+                       cleanupInterval,
+                       configuration);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 66d969a..80dfc09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -208,7 +208,7 @@ public class TaskManagerServicesConfiguration {
                int dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
                        ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
 
-               checkConfigParameter(dataport > 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+               checkConfigParameter(dataport >= 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
                        "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
 
                checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 2f453a3..a5e229b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.minicluster
 
+import java.net.InetAddress
 import java.util.concurrent.ExecutorService
 
 import akka.actor.{ActorRef, ActorSystem, Props}
@@ -42,8 +43,9 @@ import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, 
StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
-import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, 
TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
+import org.apache.flink.runtime.util.{EnvironmentInformation, 
LeaderRetrievalUtils}
 
 import scala.concurrent.Await
 import scala.concurrent.duration.FiniteDuration
@@ -195,31 +197,32 @@ class LocalFlinkMiniCluster(
 
     val resourceID = ResourceID.generate() // generate random resource id
 
-    val (taskManagerConfig,
-    taskManagerLocation,
-    memoryManager,
-    ioManager,
-    network,
-    leaderRetrievalService,
-    metricsRegistry) = TaskManager.createTaskManagerComponents(
+    val taskManagerAddress = InetAddress.getByName(hostname)
+
+    val taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(config)
+    val taskManagerServicesConfiguration = 
TaskManagerServicesConfiguration.fromConfiguration(
       config,
-      resourceID,
-      hostname, // network interface to bind to
-      localExecution, // start network stack?
-      Some(createLeaderRetrievalService()))
+      taskManagerAddress,
+      localExecution)
+
+    val taskManagerServices = TaskManagerServices.fromConfiguration(
+      taskManagerServicesConfiguration,
+      resourceID)
+
+    val metricRegistry = taskManagerServices.getMetricRegistry()
 
     val props = getTaskManagerProps(
       taskManagerClass,
-      taskManagerConfig,
+      taskManagerConfiguration,
       resourceID,
-      taskManagerLocation,
-      memoryManager,
-      ioManager,
-      network,
-      leaderRetrievalService,
-      metricsRegistry)
-
-    metricsRegistry.startQueryService(system)
+      taskManagerServices.getTaskManagerLocation(),
+      taskManagerServices.getMemoryManager(),
+      taskManagerServices.getIOManager(),
+      taskManagerServices.getNetworkEnvironment,
+      createLeaderRetrievalService(),
+      metricRegistry)
+
+    metricRegistry.startQueryService(system)
 
     system.actorOf(props, taskManagerActorName)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 79670a4..d16c1b0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,7 +37,6 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, 
MemorySegmentFactory, MemoryType}
 import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
@@ -51,12 +50,8 @@ import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{AkkaActorGateway, 
HardwareDescription, InstanceID}
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
-import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
-import org.apache.flink.runtime.io.network.{LocalConnectionManager, 
NetworkEnvironment, TaskEventDispatcher}
-import org.apache.flink.runtime.io.network.netty.{NettyConfig, 
NettyConnectionManager, PartitionStateChecker}
-import 
org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier,
 ResultPartitionManager}
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -66,16 +61,15 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStack
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.query.KvStateRegistry
-import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, 
KvStateServer}
 import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, 
SecurityConfiguration}
 import org.apache.flink.runtime.security.SecurityContext
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, 
TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
-import org.apache.flink.util.{MathUtils, NetUtils}
+import org.apache.flink.util.NetUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
@@ -142,7 +136,7 @@ class TaskManager(
   override val log = Logger(getClass)
 
   /** The timeout for all actor ask futures */
-  protected val askTimeout = new Timeout(config.timeout)
+  protected val askTimeout = new Timeout(config.getTimeout().getSize, 
config.getTimeout().getUnit())
 
   /** The TaskManager's physical execution resources */
   protected val resources = 
HardwareDescription.extractFromSystem(memoryManager.getMemorySize())
@@ -154,7 +148,7 @@ class TaskManager(
   protected val bcVarManager = new BroadcastVariableManager()
 
   /** Handler for distributed files cached by this TaskManager */
-  protected val fileCache = new FileCache(config.configuration)
+  protected val fileCache = new FileCache(config.getConfiguration())
 
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -190,8 +184,8 @@ class TaskManager(
 
   private val runtimeInfo = new TaskManagerRuntimeInfo(
        location.getHostname(),
-       new UnmodifiableConfiguration(config.configuration),
-       config.tmpDirPaths)
+       new UnmodifiableConfiguration(config.getConfiguration()),
+       config.getTmpDirPaths())
 
   private var scheduledTaskManagerRegistration: Option[Cancellable] = None
   private var currentRegistrationRun: UUID = UUID.randomUUID()
@@ -614,7 +608,9 @@ class TaskManager(
             )
 
             // the next timeout computes via exponential backoff with cap
-            val nextTimeout = (timeout * 2).min(config.maxRegistrationPause)
+            val nextTimeout = (timeout * 2).min(new FiniteDuration(
+              config.getMaxRegistrationPause().toMilliseconds,
+              TimeUnit.MILLISECONDS))
 
             // schedule (with our timeout s delay) a check triggers a new 
registration
             // attempt, if we are not registered by then
@@ -688,10 +684,14 @@ class TaskManager(
 
           if(jobManagerAkkaURL.isDefined) {
             // try the registration again after some time
-            val delay: FiniteDuration = config.refusedRegistrationPause
-            val deadline: Option[Deadline] = 
config.maxRegistrationDuration.map {
-              timeout => timeout + delay fromNow
-            }
+            val delay: FiniteDuration = new FiniteDuration(
+              config.getRefusedRegistrationPause().getSize(),
+              config.getRefusedRegistrationPause().getUnit())
+            val deadline: Option[Deadline] = 
Option(config.getMaxRegistrationDuration())
+              .map {
+                duration => new FiniteDuration(duration.getSize(), 
duration.getUnit()) +
+                  delay fromNow
+              }
 
             // start a new registration run
             currentRegistrationRun = UUID.randomUUID()
@@ -703,7 +703,9 @@ class TaskManager(
                 self ! decorateMessage(
                   TriggerTaskManagerRegistration(
                     jobManagerAkkaURL.get,
-                    config.initialRegistrationPause,
+                    new FiniteDuration(
+                      config.getInitialRegistrationPause().getSize(),
+                      config.getInitialRegistrationPause().getUnit()),
                     deadline,
                     1,
                     currentRegistrationRun)
@@ -842,7 +844,7 @@ class TaskManager(
       requestType: LogTypeRequest,
       jobManager: ActorRef)
     : Unit = {
-    val logFilePathOption = Option(config.configuration.getString(
+    val logFilePathOption = Option(config.getConfiguration().getString(
       ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
System.getProperty("log.file")));
     logFilePathOption match {
       case None => throw new IOException("TaskManager log files are 
unavailable. " +
@@ -975,9 +977,10 @@ class TaskManager(
       log.info(s"Determined BLOB server address to be $address. Starting BLOB 
cache.")
 
       try {
-        val blobcache = new BlobCache(address, config.configuration)
+        val blobcache = new BlobCache(address, config.getConfiguration())
         blobService = Option(blobcache)
-        libraryCacheManager = Some(new BlobLibraryCacheManager(blobcache, 
config.cleanupInterval))
+        libraryCacheManager = Some(
+          new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
       }
       catch {
         case e: Exception =>
@@ -1160,7 +1163,9 @@ class TaskManager(
         tdd.getJobID,
         tdd.getVertexID,
         tdd.getExecutionId,
-        config.timeout)
+        new FiniteDuration(
+          config.getTimeout().getSize(),
+          config.getTimeout().getUnit()))
 
       val task = new Task(
         tdd,
@@ -1427,7 +1432,8 @@ class TaskManager(
   def triggerTaskManagerRegistration(): Unit = {
     if(jobManagerAkkaURL.isDefined) {
       // begin attempts to reconnect
-      val deadline: Option[Deadline] = 
config.maxRegistrationDuration.map(_.fromNow)
+      val deadline: Option[Deadline] = 
Option(config.getMaxRegistrationDuration())
+        .map{ duration => new FiniteDuration(duration.getSize(), 
duration.getUnit()).fromNow }
 
       // start a new registration run
       currentRegistrationRun = UUID.randomUUID()
@@ -1437,7 +1443,9 @@ class TaskManager(
       self ! decorateMessage(
         TriggerTaskManagerRegistration(
           jobManagerAkkaURL.get,
-          config.initialRegistrationPause,
+          new FiniteDuration(
+            config.getInitialRegistrationPause().getSize(),
+            config.getInitialRegistrationPause().getUnit()),
           deadline,
           1,
           currentRegistrationRun)
@@ -1844,32 +1852,37 @@ object TaskManager {
       taskManagerClass: Class[_ <: TaskManager])
     : ActorRef = {
 
-    val (taskManagerConfig,
-      connectionInfo,
-      memoryManager,
-      ioManager,
-      network,
-      leaderRetrievalService,
-      metricsRegistry) = createTaskManagerComponents(
-      configuration,
-      resourceID,
-      taskManagerHostname,
-      localTaskManagerCommunication,
-      leaderRetrievalServiceOption)
+    val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
+
+    val taskManagerServicesConfiguration = TaskManagerServicesConfiguration
+      .fromConfiguration(configuration, taskManagerAddress, false)
+
+    val taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration)
+
+    val taskManagerServices = TaskManagerServices.fromConfiguration(
+      taskManagerServicesConfiguration,
+      resourceID)
+
+    val metricRegistry = taskManagerServices.getMetricRegistry()
+
+    val leaderRetrievalService = leaderRetrievalServiceOption match {
+      case Some(lrs) => lrs
+      case None => 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+    }
 
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = getTaskManagerProps(
       taskManagerClass,
-      taskManagerConfig,
+      taskManagerConfiguration,
       resourceID,
-      connectionInfo,
-      memoryManager,
-      ioManager,
-      network,
+      taskManagerServices.getTaskManagerLocation(),
+      taskManagerServices.getMemoryManager(),
+      taskManagerServices.getIOManager(),
+      taskManagerServices.getNetworkEnvironment(),
       leaderRetrievalService,
-      metricsRegistry)
+      metricRegistry)
 
-    metricsRegistry.startQueryService(actorSystem)
+    metricRegistry.startQueryService(actorSystem)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1896,211 +1909,11 @@ object TaskManager {
       memoryManager,
       ioManager,
       networkEnvironment,
-      taskManagerConfig.numberOfSlots,
+      taskManagerConfig.getNumberSlots(),
       leaderRetrievalService,
       metricsRegistry)
   }
 
-  def createTaskManagerComponents(
-    configuration: Configuration,
-    resourceID: ResourceID,
-    taskManagerHostname: String,
-    localTaskManagerCommunication: Boolean,
-    leaderRetrievalServiceOption: Option[LeaderRetrievalService]):
-      (TaskManagerConfiguration,
-      TaskManagerLocation,
-      MemoryManager,
-      IOManager,
-      NetworkEnvironment,
-      LeaderRetrievalService,
-      FlinkMetricRegistry) = {
-
-    val (taskManagerConfig : TaskManagerConfiguration,
-    netConfig: NetworkEnvironmentConfiguration,
-    taskManagerAddress: InetSocketAddress,
-    memType: MemoryType
-      ) = parseTaskManagerConfiguration(
-      configuration,
-      taskManagerHostname,
-      localTaskManagerCommunication)
-
-    // pre-start checks
-    checkTempDirs(taskManagerConfig.tmpDirPaths)
-
-    val networkBufferPool = new NetworkBufferPool(
-      netConfig.numNetworkBuffers,
-      netConfig.networkBufferSize,
-      netConfig.memoryType)
-
-    val connectionManager = Option(netConfig.nettyConfig) match {
-      case Some(nettyConfig) => new NettyConnectionManager(nettyConfig)
-      case None => new LocalConnectionManager()
-    }
-
-    val resultPartitionManager = new ResultPartitionManager()
-    val taskEventDispatcher = new TaskEventDispatcher()
-
-    val kvStateRegistry = new KvStateRegistry()
-
-    val kvStateServer = Option(netConfig.nettyConfig) match {
-      case Some(nettyConfig) =>
-
-        val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) {
-          nettyConfig.getNumberOfSlots
-        } else {
-          netConfig.queryServerNetworkThreads
-        }
-
-        val numQueryThreads = if (netConfig.queryServerQueryThreads == 0) {
-          nettyConfig.getNumberOfSlots
-        } else {
-          netConfig.queryServerQueryThreads
-        }
-
-        new KvStateServer(
-          taskManagerAddress.getAddress(),
-          netConfig.queryServerPort,
-          numNetworkThreads,
-          numQueryThreads,
-          kvStateRegistry,
-          new DisabledKvStateRequestStats())
-
-      case None => null
-    }
-
-    // we start the network first, to make sure it can allocate its buffers 
first
-    val network = new NetworkEnvironment(
-      networkBufferPool,
-      connectionManager,
-      resultPartitionManager,
-      taskEventDispatcher,
-      kvStateRegistry,
-      kvStateServer,
-      netConfig.ioMode,
-      netConfig.partitionRequestInitialBackoff,
-      netConfig.partitinRequestMaxBackoff)
-
-    network.start()
-
-    val taskManagerLocation = new TaskManagerLocation(
-      resourceID,
-      taskManagerAddress.getAddress(),
-      network.getConnectionManager().getDataPort())
-
-    // computing the amount of memory to use depends on how much memory is 
available
-    // it strictly needs to happen AFTER the network stack has been initialized
-
-    // check if a value has been configured
-    val configuredMemory = 
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
-    checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, 
configuredMemory,
-                         ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-                         "MemoryManager needs at least one MB of memory. " +
-                           "If you leave this config parameter empty, the 
system automatically " +
-                           "pick a fraction of the available memory.")
-
-
-    val preAllocateMemory = configuration.getBoolean(
-      ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE)
-
-    val memorySize = if (configuredMemory > 0) {
-      if (preAllocateMemory) {
-        LOG.info(s"Using $configuredMemory MB for managed memory.")
-      } else {
-        LOG.info(s"Limiting managed memory to $configuredMemory MB, " +
-                   s"memory will be allocated lazily.")
-      }
-      configuredMemory << 20 // megabytes to bytes
-    }
-    else {
-      val fraction = configuration.getFloat(
-        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
-      checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction,
-                           ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-                           "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0")
-
-      if (memType == MemoryType.HEAP) {
-        val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-          fraction).toLong
-
-        if (preAllocateMemory) {
-          LOG.info(s"Using $fraction of the currently free heap space for 
managed " +
-                     s"heap memory (${relativeMemSize >> 20} MB).")
-        } else {
-          LOG.info(s"Limiting managed memory to $fraction of the currently 
free heap space " +
-                     s"(${relativeMemSize >> 20} MB), memory will be allocated 
lazily.")
-        }
-
-        relativeMemSize
-      }
-      else if (memType == MemoryType.OFF_HEAP) {
-
-        // The maximum heap memory has been adjusted according to the fraction
-        val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory()
-        val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
-
-        if (preAllocateMemory) {
-          LOG.info(s"Using $fraction of the maximum memory size for " +
-                     s"managed off-heap memory (${directMemorySize >> 20} 
MB).")
-        } else {
-          LOG.info(s"Limiting managed memory to $fraction of the maximum 
memory size " +
-                     s"(${directMemorySize >> 20} MB), memory will be 
allocated lazily.")
-        }
-
-        directMemorySize
-      }
-      else {
-        throw new RuntimeException("No supported memory type detected.")
-      }
-    }
-
-    // now start the memory manager
-    val memoryManager = try {
-      new MemoryManager(
-        memorySize,
-        taskManagerConfig.numberOfSlots,
-        netConfig.networkBufferSize,
-        memType,
-        preAllocateMemory)
-    }
-    catch {
-      case e: OutOfMemoryError =>
-        memType match {
-          case MemoryType.HEAP =>
-            throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-                      s" while allocating the TaskManager heap memory 
($memorySize bytes).", e)
-
-          case MemoryType.OFF_HEAP =>
-            throw new Exception(s"OutOfMemory error (${e.getMessage()})" +
-                      s" while allocating the TaskManager off-heap memory 
($memorySize bytes). " +
-                      s"Try increasing the maximum direct memory 
(-XX:MaxDirectMemorySize)", e)
-
-          case _ => throw e
-        }
-    }
-
-    // start the I/O manager last, it will create some temp directories.
-    val ioManager: IOManager = new 
IOManagerAsync(taskManagerConfig.tmpDirPaths)
-
-    val leaderRetrievalService = leaderRetrievalServiceOption match {
-      case Some(lrs) => lrs
-      case None => 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
-    }
-
-    val metricsRegistry = new FlinkMetricRegistry(
-      MetricRegistryConfiguration.fromConfiguration(configuration))
-
-    (taskManagerConfig,
-      taskManagerLocation,
-      memoryManager,
-      ioManager,
-      network,
-      leaderRetrievalService,
-      metricsRegistry)
-  }
-
-
   // --------------------------------------------------------------------------
   //  Resolving the TaskManager actor
   // --------------------------------------------------------------------------
@@ -2140,239 +1953,6 @@ object TaskManager {
   // --------------------------------------------------------------------------
 
   /**
-   * Utility method to extract TaskManager config parameters from the 
configuration and to
-   * sanity check them.
-   *
-   * @param configuration The configuration.
-   * @param taskManagerHostname The host name under which the TaskManager 
communicates.
-   * @param localTaskManagerCommunication True, to skip initializing the 
network stack.
-   *                                      Use only in cases where only one 
task manager runs.
-   * @return A tuple (TaskManagerConfiguration, network configuration, inet 
socket address,
-    *         memory tyep).
-   */
-  @throws(classOf[IllegalArgumentException])
-  def parseTaskManagerConfiguration(
-      configuration: Configuration,
-      taskManagerHostname: String,
-      localTaskManagerCommunication: Boolean)
-    : (TaskManagerConfiguration,
-     NetworkEnvironmentConfiguration,
-     InetSocketAddress,
-     MemoryType) = {
-
-    // ------- read values from the config and check them ---------
-    //                      (a lot of them)
-
-    // ----> hosts / ports for communication and data exchange
-
-    val dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
-    checkConfigParameter(dataport >= 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      "Leave config parameter empty or use 0 to let the system choose a port 
automatically.")
-
-    val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
-    val taskManagerInetSocketAddress = new 
InetSocketAddress(taskManagerAddress, dataport)
-
-    // ----> memory / network stack (shuffles/broadcasts), task slots, temp 
directories
-
-    // we need this because many configs have been written with a "-1" entry
-    val slots = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) match {
-      case -1 => 1
-      case x => x
-    }
-
-    checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-      "Number of task slots must be at least one.")
-
-    val numNetworkBuffers = configuration.getInteger(
-      ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
-
-    checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-      ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
-    
-    val pageSize: Int = configuration.getInteger(
-      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
-    // check page size of for minimum size
-    checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
-      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-      "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE)
-
-    // check page size for power of two
-    checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-      "Memory segment size must be a power of 2.")
-    
-    // check whether we use heap or off-heap memory
-    val memType: MemoryType = 
-      if 
(configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, 
false)) {
-        MemoryType.OFF_HEAP
-      } else {
-        MemoryType.HEAP
-      }
-    
-    // initialize the memory segment factory accordingly
-    memType match {
-      case MemoryType.HEAP =>
-        if 
(!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
-          throw new Exception("Memory type is set to heap memory, but memory 
segment " +
-            "factory has been initialized for off-heap memory segments")
-        }
-
-      case MemoryType.OFF_HEAP =>
-        if 
(!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) 
{
-          throw new Exception("Memory type is set to off-heap memory, but 
memory segment " +
-            "factory has been initialized for heap memory segments")
-        }
-    }
-    
-    val tmpDirs = configuration.getString(
-      ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH)
-    .split(",|" + File.pathSeparator)
-
-    val nettyConfig = if (localTaskManagerCommunication) {
-      None
-    } else {
-      Some(
-        new NettyConfig(
-          taskManagerInetSocketAddress.getAddress(),
-          taskManagerInetSocketAddress.getPort(),
-          pageSize,
-          slots,
-          configuration)
-      )
-    }
-
-    // Default spill I/O mode for intermediate results
-    val syncOrAsync = configuration.getString(
-      ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE)
-
-    val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else 
IOMode.SYNC
-
-    val queryServerPort =  configuration.getInteger(
-      ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
-      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT)
-
-    val queryServerNetworkThreads =  configuration.getInteger(
-      ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
-      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS)
-
-    val queryServerQueryThreads =  configuration.getInteger(
-      ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
-      ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS)
-
-    val networkConfig = NetworkEnvironmentConfiguration(
-      numNetworkBuffers,
-      pageSize,
-      memType,
-      ioMode,
-      queryServerPort,
-      queryServerNetworkThreads,
-      queryServerQueryThreads,
-      nettyConfig.getOrElse(null))
-
-    // ----> timeouts, library caching, profiling
-
-    val timeout = try {
-      AkkaUtils.getTimeout(configuration)
-    } catch {
-      case e: Exception => throw new IllegalArgumentException(
-        s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " +
-          s"Use formats like '50 s' or '1 min' to specify the timeout.")
-    }
-    LOG.info("Messages between TaskManager and JobManager have a max timeout 
of " + timeout)
-
-    val cleanupInterval = configuration.getLong(
-      ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-      ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
-
-    val finiteRegistrationDuration = try {
-      val maxRegistrationDuration = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-        ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
-
-      if (maxRegistrationDuration.isFinite()) {
-        Some(maxRegistrationDuration.asInstanceOf[FiniteDuration])
-      } else {
-        None
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + 
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-        e)
-    }
-
-    val initialRegistrationPause = try {
-      val pause = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE
-      ))
-
-      if (pause.isFinite()) {
-        pause.asInstanceOf[FiniteDuration]
-      } else {
-        throw new IllegalArgumentException(s"The initial registration pause 
must be finite: $pause")
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + 
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        e)
-    }
-
-    val maxRegistrationPause = try {
-      val pause = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
-        ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE
-      ))
-
-      if (pause.isFinite()) {
-        pause.asInstanceOf[FiniteDuration]
-      } else {
-        throw new IllegalArgumentException(s"The maximum registration pause 
must be finite: $pause")
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + 
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        e)
-    }
-
-    val refusedRegistrationPause = try {
-      val pause = Duration(configuration.getString(
-        ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
-        ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE
-      ))
-
-      if (pause.isFinite()) {
-        pause.asInstanceOf[FiniteDuration]
-      } else {
-        throw new IllegalArgumentException(s"The refused registration pause 
must be finite: $pause")
-      }
-    } catch {
-      case e: NumberFormatException => throw new IllegalArgumentException(
-        "Invalid format for parameter " + 
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-        e)
-    }
-
-    val taskManagerConfig = TaskManagerConfiguration(
-      tmpDirs,
-      cleanupInterval,
-      timeout,
-      finiteRegistrationDuration,
-      slots,
-      configuration,
-      initialRegistrationPause,
-      maxRegistrationPause,
-      refusedRegistrationPause)
-
-    (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
-  }
-
-  /**
    * Gets the hostname and port of the JobManager from the configuration. Also 
checks that
    * the hostname is not null and the port non-negative.
    *
@@ -2406,71 +1986,6 @@ object TaskManager {
   // --------------------------------------------------------------------------
 
   /**
-   * Validates a condition for a config parameter and displays a standard 
exception, if the
-   * the condition does not hold.
-   *
-   * @param condition The condition that must hold. If the condition is false, 
an
-   *                  exception is thrown.
-   * @param parameter The parameter value. Will be shown in the exception 
message.
-   * @param name The name of the config parameter. Will be shown in the 
exception message.
-   * @param errorMessage The optional custom error message to append to the 
exception message.
-   * @throws IllegalConfigurationException Thrown if the condition is violated.
-   */
-  @throws(classOf[IllegalConfigurationException])
-  private def checkConfigParameter(
-      condition: Boolean,
-      parameter: Any,
-      name: String,
-      errorMessage: String = "")
-    : Unit = {
-    if (!condition) {
-      throw new IllegalConfigurationException(
-        s"Invalid configuration value for '$name' : $parameter - 
$errorMessage")
-    }
-  }
-
-  /**
-   * Validates that all the directories denoted by the strings do actually 
exist, are proper
-   * directories (not files), and are writable.
-   *
-   * @param tmpDirs The array of directory paths to check.
-   * @throws Exception Thrown if any of the directories does not exist or is 
not writable
-   *                   or is a file, rather than a directory.
-   */
-  @throws(classOf[IOException])
-  private def checkTempDirs(tmpDirs: Array[String]): Unit = {
-    tmpDirs.zipWithIndex.foreach {
-      case (dir: String, _) =>
-        val file = new File(dir)
-
-        if (!file.exists) {
-          throw new IOException(
-            s"Temporary file directory ${file.getAbsolutePath} does not 
exist.")
-        }
-        if (!file.isDirectory) {
-          throw new IOException(
-            s"Temporary file directory ${file.getAbsolutePath} is not a 
directory.")
-        }
-        if (!file.canWrite) {
-          throw new IOException(
-            s"Temporary file directory ${file.getAbsolutePath} is not 
writable.")
-        }
-
-        if (LOG.isInfoEnabled) {
-          val totalSpaceGb = file.getTotalSpace >>  30
-          val usableSpaceGb = file.getUsableSpace >> 30
-          val usablePercentage = usableSpaceGb.asInstanceOf[Double] / 
totalSpaceGb * 100
-
-          val path = file.getAbsolutePath
-
-          LOG.info(f"Temporary file directory '$path': total $totalSpaceGb GB, 
" +
-            f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)")
-        }
-      case (_, id) => throw new IllegalArgumentException(s"Temporary file 
directory #$id is null.")
-    }
-  }
-
-  /**
    * Creates the registry of default metrics, including stats about garbage 
collection, memory
    * usage, and system CPU load.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
deleted file mode 100644
index aab3c5f..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.configuration.Configuration
-
-import scala.concurrent.duration.FiniteDuration
-
-case class TaskManagerConfiguration(
-    tmpDirPaths: Array[String],
-    cleanupInterval: Long,
-    timeout: FiniteDuration,
-    maxRegistrationDuration: Option[FiniteDuration],
-    numberOfSlots: Int,
-    configuration: Configuration,
-    initialRegistrationPause: FiniteDuration,
-    maxRegistrationPause: FiniteDuration,
-    refusedRegistrationPause: FiniteDuration) {
-
-  def this(
-      tmpDirPaths: Array[String],
-      cleanupInterval: Long,
-      timeout: FiniteDuration,
-      maxRegistrationDuration: Option[FiniteDuration],
-      numberOfSlots: Int,
-      configuration: Configuration) {
-    this (
-      tmpDirPaths,
-      cleanupInterval,
-      timeout,
-      maxRegistrationDuration,
-      numberOfSlots,
-      configuration,
-      FiniteDuration(500, TimeUnit.MILLISECONDS),
-      FiniteDuration(30, TimeUnit.SECONDS),
-      FiniteDuration(10, TimeUnit.SECONDS))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 627a25a..500d1bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -26,6 +26,7 @@ import akka.actor.Kill;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
@@ -49,11 +50,11 @@ import 
org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 import org.junit.Test;
 
-import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
@@ -69,7 +70,7 @@ public class TaskManagerComponentsStartupShutdownTest {
        public void testComponentsStartupShutdown() {
 
                final String[] TMP_DIR = new String[] { 
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
-               final FiniteDuration timeout = new FiniteDuration(100, 
TimeUnit.SECONDS);
+               final Time timeout = Time.seconds(100);
                final int BUFFER_SIZE = 32 * 1024;
 
                Configuration config = new Configuration();
@@ -93,14 +94,19 @@ public class TaskManagerComponentsStartupShutdownTest {
                                
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager),
                                StandaloneResourceManager.class);
 
+                       final int numberOfSlots = 1;
+
                        // create the components for the TaskManager manually
                        final TaskManagerConfiguration tmConfig = new 
TaskManagerConfiguration(
-                                       TMP_DIR,
-                                       1000000,
-                                       timeout,
-                                       Option.<FiniteDuration>empty(),
-                                       1,
-                                       config);
+                               numberOfSlots,
+                               TMP_DIR,
+                               timeout,
+                               null,
+                               Time.milliseconds(500),
+                               Time.seconds(30),
+                               Time.seconds(10),
+                               1000000, // cleanup interval
+                               config);
 
                        final NetworkEnvironmentConfiguration netConf = new 
NetworkEnvironmentConfiguration(
                                        32, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC, 0, 0, 0,
@@ -125,8 +131,6 @@ public class TaskManagerComponentsStartupShutdownTest {
 
                        network.start();
 
-                       final int numberOfSlots = 1;
-
                        LeaderRetrievalService leaderRetrievalService = new 
StandaloneLeaderRetrievalService(jobManager.path().toString());
 
                        MetricRegistryConfiguration metricRegistryConfiguration 
= MetricRegistryConfiguration.fromConfiguration(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 707401b..09dc5ed 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, 
TaskManagerConfiguration}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
 import scala.language.postfixOps
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 0abdd46..d311bc5 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -241,7 +241,6 @@ object TestingUtils {
     )
   }
 
-
   def createTaskManager(
       actorSystem: ActorSystem,
       jobManagerURL: String,

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 1010432..0f82faa 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
-import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, 
TaskManagerLocation}
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
 
 /** [[YarnTaskManager]] implementation which mixes in the 
[[TestingTaskManagerLike]] mixin.

http://git-wip-us.apache.org/repos/asf/flink/blob/bce292ae/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 2ab9b20..be31085 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -23,8 +23,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.

Reply via email to