[FLINK-8636] [flip6] Use TaskManagerServices to pass in services to TaskExecutor

Pass in the TaskExecutor services via the TaskManagerServices instead
of individually.

This closes #5456.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c8851a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c8851a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c8851a3

Branch: refs/heads/master
Commit: 4c8851a3aa8d4154e88d8a19c93535ee9ac4a6a3
Parents: f2ae241
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Feb 11 23:18:06 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 12 11:59:29 2018 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  98 +++----
 .../runtime/taskexecutor/TaskManagerRunner.java |  27 +-
 .../taskexecutor/TaskManagerServices.java       |  59 ++++-
 .../taskexecutor/TaskExecutorITCase.java        |  47 +---
 .../runtime/taskexecutor/TaskExecutorTest.java  | 261 ++++++++-----------
 .../TaskManagerServicesBuilder.java             | 133 ++++++++++
 6 files changed, 358 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 0eaf67e..d880407 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobKey;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -41,14 +40,12 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -59,7 +56,6 @@ import 
org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -132,27 +128,14 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        public static final String TASK_MANAGER_NAME = "taskmanager";
 
-       /** The connection information of this task manager. */
-       private final TaskManagerLocation taskManagerLocation;
-
        /** The access to the leader election and retrieval services. */
        private final HighAvailabilityServices haServices;
 
+       private final TaskManagerServices taskExecutorServices;
+
        /** The task manager configuration. */
        private final TaskManagerConfiguration taskManagerConfiguration;
 
-       /** The I/O manager component in the task manager. */
-       private final IOManager ioManager;
-
-       /** The memory manager component in the task manager. */
-       private final MemoryManager memoryManager;
-
-       /** The state manager for this task, providing state managers per slot. 
*/
-       private final TaskExecutorLocalStateStoresManager 
localStateStoresManager;
-
-       /** The network component in the task manager. */
-       private final NetworkEnvironment networkEnvironment;
-
        /** The heartbeat manager for job manager in the task manager. */
        private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
@@ -162,13 +145,20 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        /** The fatal error handler to use in case of a fatal error. */
        private final FatalErrorHandler fatalErrorHandler;
 
-       private final TaskManagerMetricGroup taskManagerMetricGroup;
+       private final BlobCacheService blobCacheService;
 
-       private final BroadcastVariableManager broadcastVariableManager;
+       // --------- TaskManager services --------
 
-       private final FileCache fileCache;
+       /** The connection information of this task manager. */
+       private final TaskManagerLocation taskManagerLocation;
 
-       private final BlobCacheService blobCacheService;
+       private final TaskManagerMetricGroup taskManagerMetricGroup;
+
+       /** The state manager for this task, providing state managers per slot. 
*/
+       private final TaskExecutorLocalStateStoresManager 
localStateStoresManager;
+
+       /** The network component in the task manager. */
+       private final NetworkEnvironment networkEnvironment;
 
        // --------- resource manager --------
 
@@ -193,20 +183,11 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        public TaskExecutor(
                        RpcService rpcService,
                        TaskManagerConfiguration taskManagerConfiguration,
-                       TaskManagerLocation taskManagerLocation,
-                       MemoryManager memoryManager,
-                       IOManager ioManager,
-                       TaskExecutorLocalStateStoresManager 
localStateStoresManager,
-                       NetworkEnvironment networkEnvironment,
                        HighAvailabilityServices haServices,
+                       TaskManagerServices taskExecutorServices,
                        HeartbeatServices heartbeatServices,
                        TaskManagerMetricGroup taskManagerMetricGroup,
-                       BroadcastVariableManager broadcastVariableManager,
-                       FileCache fileCache,
                        BlobCacheService blobCacheService,
-                       TaskSlotTable taskSlotTable,
-                       JobManagerTable jobManagerTable,
-                       JobLeaderService jobLeaderService,
                        FatalErrorHandler fatalErrorHandler) {
 
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
@@ -214,36 +195,37 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                checkArgument(taskManagerConfiguration.getNumberSlots() > 0, 
"The number of slots has to be larger than 0.");
 
                this.taskManagerConfiguration = 
checkNotNull(taskManagerConfiguration);
-               this.taskManagerLocation = checkNotNull(taskManagerLocation);
-               this.memoryManager = checkNotNull(memoryManager);
-               this.localStateStoresManager = 
checkNotNull(localStateStoresManager);
-               this.ioManager = checkNotNull(ioManager);
-               this.networkEnvironment = checkNotNull(networkEnvironment);
+               this.taskExecutorServices = checkNotNull(taskExecutorServices);
                this.haServices = checkNotNull(haServices);
-               this.taskSlotTable = checkNotNull(taskSlotTable);
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
-               this.broadcastVariableManager = 
checkNotNull(broadcastVariableManager);
-               this.fileCache = checkNotNull(fileCache);
                this.blobCacheService = checkNotNull(blobCacheService);
-               this.jobManagerTable = checkNotNull(jobManagerTable);
-               this.jobLeaderService = checkNotNull(jobLeaderService);
+
+               this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
+               this.jobManagerTable = 
taskExecutorServices.getJobManagerTable();
+               this.jobLeaderService = 
taskExecutorServices.getJobLeaderService();
+               this.taskManagerLocation = 
taskExecutorServices.getTaskManagerLocation();
+               this.localStateStoresManager = 
taskExecutorServices.getTaskStateManager();
+               this.networkEnvironment = 
taskExecutorServices.getNetworkEnvironment();
 
                this.jobManagerConnections = new HashMap<>(4);
 
+               final ResourceID resourceId = 
taskExecutorServices.getTaskManagerLocation().getResourceID();
+
                this.jobManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManager(
-                       getResourceID(),
+                       resourceId,
                        new JobManagerHeartbeatListener(),
                        rpcService.getScheduledExecutor(),
                        log);
 
                this.resourceManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManager(
-                       getResourceID(),
+                       resourceId,
                        new ResourceManagerHeartbeatListener(),
                        rpcService.getScheduledExecutor(),
                        log);
 
-               hardwareDescription = 
HardwareDescription.extractFromSystem(memoryManager.getMemorySize());
+               hardwareDescription = HardwareDescription.extractFromSystem(
+                       
taskExecutorServices.getMemoryManager().getMemorySize());
        }
 
        // 
------------------------------------------------------------------------
@@ -277,8 +259,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                Throwable throwable = null;
 
-               taskSlotTable.stop();
-
                if (isConnectedToResourceManager()) {
                        resourceManagerConnection.close();
                }
@@ -295,13 +275,11 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                resourceManagerHeartbeatManager.stop();
 
-               ioManager.shutdown();
-
-               memoryManager.shutdown();
-
-               networkEnvironment.shutdown();
-
-               fileCache.shutdown();
+               try {
+                       taskExecutorServices.shutDown();
+               } catch (Throwable t) {
+                       throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
+               }
 
                try {
                        super.postStop();
@@ -504,17 +482,17 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                tdd.getProducedPartitions(),
                                tdd.getInputGates(),
                                tdd.getTargetSlotNumber(),
-                               memoryManager,
-                               ioManager,
-                               networkEnvironment,
-                               broadcastVariableManager,
+                               taskExecutorServices.getMemoryManager(),
+                               taskExecutorServices.getIOManager(),
+                               taskExecutorServices.getNetworkEnvironment(),
+                               
taskExecutorServices.getBroadcastVariableManager(),
                                taskStateManager,
                                taskManagerActions,
                                inputSplitProvider,
                                checkpointResponder,
                                blobCacheService,
                                libraryCache,
-                               fileCache,
+                               taskExecutorServices.getFileCache(),
                                taskManagerConfiguration,
                                taskMetricGroup,
                                resultPartitionConsumableNotifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 333ce12..3bf88d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -48,7 +48,6 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
@@ -61,6 +60,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This class is the executable entry point for the task manager in yarn or 
standalone mode.
@@ -97,8 +97,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
        private final TaskExecutor taskManager;
 
        public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId) throws Exception {
-               this.configuration = Preconditions.checkNotNull(configuration);
-               this.resourceId = Preconditions.checkNotNull(resourceId);
+               this.configuration = checkNotNull(configuration);
+               this.resourceId = checkNotNull(resourceId);
 
                timeout = AkkaUtils.getTimeoutAsTime(configuration);
 
@@ -270,10 +270,10 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
                        boolean localCommunicationOnly,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
 
-               Preconditions.checkNotNull(configuration);
-               Preconditions.checkNotNull(resourceID);
-               Preconditions.checkNotNull(rpcService);
-               Preconditions.checkNotNull(highAvailabilityServices);
+               checkNotNull(configuration);
+               checkNotNull(resourceID);
+               checkNotNull(rpcService);
+               checkNotNull(highAvailabilityServices);
 
                InetAddress remoteAddress = 
InetAddress.getByName(rpcService.getAddress());
 
@@ -297,20 +297,11 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
                return new TaskExecutor(
                        rpcService,
                        taskManagerConfiguration,
-                       taskManagerServices.getTaskManagerLocation(),
-                       taskManagerServices.getMemoryManager(),
-                       taskManagerServices.getIOManager(),
-                       taskManagerServices.getTaskStateManager(),
-                       taskManagerServices.getNetworkEnvironment(),
                        highAvailabilityServices,
+                       taskManagerServices,
                        heartbeatServices,
                        taskManagerMetricGroup,
-                       taskManagerServices.getBroadcastVariableManager(),
-                       taskManagerServices.getFileCache(),
                        blobCacheService,
-                       taskManagerServices.getTaskSlotTable(),
-                       taskManagerServices.getJobManagerTable(),
-                       taskManagerServices.getJobLeaderService(),
                        fatalErrorHandler);
        }
 
@@ -346,7 +337,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
 
                final int rpcPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
 
-               Preconditions.checkState(rpcPort >= 0 && rpcPort <= 65535, 
"Invalid value for " +
+               checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for 
" +
                                "'%s' (port for the TaskManager actor system) : 
%d - Leave config parameter empty or " +
                                "use 0 to let the system choose port 
automatically.",
                        ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 29984e9..0879481 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -47,6 +47,8 @@ import 
org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -60,7 +62,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
  * Container for {@link TaskExecutor} services such as the {@link 
MemoryManager}, {@link IOManager},
- * {@link NetworkEnvironment}.
+ * {@link NetworkEnvironment}. All services are exclusive to a single {@link 
TaskExecutor}.
+ * Consequently, the respective {@link TaskExecutor} is responsible for 
closing them.
  */
 public class TaskManagerServices {
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServices.class);
@@ -77,7 +80,7 @@ public class TaskManagerServices {
        private final JobLeaderService jobLeaderService;
        private final TaskExecutorLocalStateStoresManager taskStateManager;
 
-       private TaskManagerServices(
+       TaskManagerServices(
                TaskManagerLocation taskManagerLocation,
                MemoryManager memoryManager,
                IOManager ioManager,
@@ -146,6 +149,58 @@ public class TaskManagerServices {
        }
 
        // 
--------------------------------------------------------------------------------------------
+       //  Shut down method
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Shuts the {@link TaskExecutor} services down.
+        */
+       public void shutDown() throws FlinkException {
+
+               Exception exception = null;
+
+               try {
+                       memoryManager.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       ioManager.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       networkEnvironment.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       fileCache.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       taskSlotTable.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       jobLeaderService.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               if (exception != null) {
+                       throw new FlinkException("Could not properly shut down 
the TaskManager services.", exception);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
        //  Static factory methods for task manager services
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index c5d7c24..dc1d09f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -22,28 +22,24 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCacheService;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.MockNetworkEnvironment;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -53,7 +49,6 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -122,29 +117,19 @@ public class TaskExecutorITCase extends TestLogger {
                        testingHAServices,
                        rpcService.getScheduledExecutor(),
                        Time.minutes(5L));
-               MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
-               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class, RETURNS_MOCKS);
+               MetricRegistry metricRegistry = NoOpMetricRegistry.INSTANCE;
+               HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
 
                final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
                final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
-               final MemoryManager memoryManager = mock(MemoryManager.class);
-               final IOManager ioManager = mock(IOManager.class);
-               final NetworkEnvironment networkEnvironment = 
MockNetworkEnvironment.getMock();
-               final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
-               final BroadcastVariableManager broadcastVariableManager = 
mock(BroadcastVariableManager.class);
-               final FileCache fileCache = mock(FileCache.class);
                final List<ResourceProfile> resourceProfiles = 
Arrays.asList(resourceProfile);
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(resourceProfiles, new 
TimerService<AllocationID>(scheduledExecutorService, 100L));
-               final JobManagerTable jobManagerTable = new JobManagerTable();
-               final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
                final SlotManager slotManager = new SlotManager(
                        rpcService.getScheduledExecutor(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
-               final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager();
-
                ResourceManager<ResourceID> resourceManager = new 
StandaloneResourceManager(
                        rpcService,
                        FlinkResourceManager.RESOURCE_MANAGER_NAME,
@@ -158,26 +143,22 @@ public class TaskExecutorITCase extends TestLogger {
                        new ClusterInformation("localhost", 1234),
                        testingFatalErrorHandler);
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .build();
+
                TaskExecutor taskExecutor = new TaskExecutor(
                        rpcService,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       memoryManager,
-                       ioManager,
-                       taskStateManager,
-                       networkEnvironment,
                        testingHAServices,
+                       taskManagerServices,
                        heartbeatServices,
-                       taskManagerMetricGroup,
-                       broadcastVariableManager,
-                       fileCache,
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        new BlobCacheService(
                                configuration,
-                               testingHAServices.createBlobStore(),
+                               new VoidBlobStore(),
                                null),
-                       taskSlotTable,
-                       jobManagerTable,
-                       jobLeaderService,
                        testingFatalErrorHandler);
 
                JobMasterGateway jmGateway = mock(JobMasterGateway.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 0e12c54..cbd7965 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -39,7 +38,6 @@ import 
org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
@@ -50,7 +48,6 @@ import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
@@ -63,7 +60,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -76,7 +72,6 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -242,23 +237,20 @@ public class TaskExecutorTest extends TestLogger {
                
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
                when(jobMasterGateway.getHostname()).thenReturn("localhost");
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobLeaderService(jobLeaderService)
+                       .build();
+
                final TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        tmConfig,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
+                       taskManagerServices,
                        heartbeatServices,
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       new JobManagerTable(),
-                       jobLeaderService,
                        testingFatalErrorHandler);
 
                try {
@@ -350,23 +342,19 @@ public class TaskExecutorTest extends TestLogger {
                                }
                );
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .build();
+
                final TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
+                       taskManagerServices,
                        heartbeatServices,
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       mock(JobManagerTable.class),
-                       mock(JobLeaderService.class),
                        testingFatalErrorHandler);
 
                try {
@@ -470,23 +458,19 @@ public class TaskExecutorTest extends TestLogger {
                        }
                );
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .build();
+
                final TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
+                       taskManagerServices,
                        heartbeatServices,
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       mock(JobManagerTable.class),
-                       mock(JobLeaderService.class),
                        testingFatalErrorHandler);
 
                try {
@@ -566,23 +550,19 @@ public class TaskExecutorTest extends TestLogger {
 
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .build();
+
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerServicesConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
-                       mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       mock(JobManagerTable.class),
-                       mock(JobLeaderService.class),
                        testingFatalErrorHandler);
 
                try {
@@ -650,23 +630,19 @@ public class TaskExecutorTest extends TestLogger {
 
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .build();
+
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerServicesConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
-                       mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       mock(JobManagerTable.class),
-                       mock(JobLeaderService.class),
                        testingFatalErrorHandler);
 
                try {
@@ -791,23 +767,20 @@ public class TaskExecutorTest extends TestLogger {
 
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setNetworkEnvironment(networkEnvironment)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobManagerTable(jobManagerTable)
+                       .build();
+
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       mock(TaskManagerLocation.class),
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       networkEnvironment,
                        haServices,
-                       mock(HeartbeatServices.class, RETURNS_MOCKS),
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
                        taskManagerMetricGroup,
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       jobManagerTable,
-                       mock(JobLeaderService.class),
                        testingFatalErrorHandler);
 
                try {
@@ -912,23 +885,21 @@ public class TaskExecutorTest extends TestLogger {
                final SlotID slotId = new SlotID(resourceId, 0);
                final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
ResourceProfile.UNKNOWN);
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobManagerTable(jobManagerTable)
+                       .setJobLeaderService(jobLeaderService)
+                       .build();
+
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
-                       mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       jobManagerTable,
-                       jobLeaderService,
                        testingFatalErrorHandler);
 
                try {
@@ -1032,23 +1003,21 @@ public class TaskExecutorTest extends TestLogger {
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobManagerTable(jobManagerTable)
+                       .setJobLeaderService(jobLeaderService)
+                       .build();
+
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
-                       mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       jobManagerTable,
-                       jobLeaderService,
                        testingFatalErrorHandler);
 
                try {
@@ -1124,27 +1093,22 @@ public class TaskExecutorTest extends TestLogger {
                final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new 
SlotReport());
                when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new 
AllocationID());
+               when(rmGateway1.registerTaskExecutor(anyString(), 
eq(resourceID), any(SlotReport.class), anyInt(), 
any(HardwareDescription.class), any(Time.class))).thenReturn(
+               CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L, 
new ClusterInformation("localhost", 1234))));
 
-                       when(rmGateway1.registerTaskExecutor(anyString(), 
eq(resourceID), any(SlotReport.class), anyInt(), 
any(HardwareDescription.class), any(Time.class))).thenReturn(
-                       CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L, 
new ClusterInformation("localhost", 1234))));
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .build();
 
                TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerServicesConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
-                       mock(HeartbeatServices.class, RETURNS_MOCKS),
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       mock(JobManagerTable.class),
-                       mock(JobLeaderService.class),
                        testingFatalErrorHandler);
 
                try {
@@ -1300,23 +1264,22 @@ public class TaskExecutorTest extends TestLogger {
 
                final NetworkEnvironment networkMock = 
mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setNetworkEnvironment(networkMock)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobLeaderService(jobLeaderService)
+                       .setJobManagerTable(jobManagerTable)
+                       .build();
+
                final TaskExecutor taskManager = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       networkMock,
                        haServices,
-                       mock(HeartbeatServices.class, RETURNS_MOCKS),
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
                        taskManagerMetricGroup,
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       jobManagerTable,
-                       jobLeaderService,
                        testingFatalErrorHandler);
 
                try {
@@ -1424,23 +1387,20 @@ public class TaskExecutorTest extends TestLogger {
                final JMTMRegistrationSuccess registrationMessage = new 
JMTMRegistrationSuccess(ResourceID.generate());
                final JobManagerTable jobManagerTableMock = spy(new 
JobManagerTable());
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setJobManagerTable(jobManagerTableMock)
+                       .setJobLeaderService(jobLeaderService)
+                       .build();
+
                final TaskExecutor taskExecutor = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServicesMock,
+                       taskManagerServices,
                        heartbeatServicesMock,
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       mock(TaskSlotTable.class),
-                       jobManagerTableMock,
-                       jobLeaderService,
                        testingFatalErrorHandler);
 
                try {
@@ -1504,23 +1464,19 @@ public class TaskExecutorTest extends TestLogger {
 
                rpc.registerGateway(rmAddress, rmGateway);
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .build();
+
                final TaskExecutor taskExecutor = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       taskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
+                       taskManagerServices,
                        heartbeatServices,
-                       mock(TaskManagerMetricGroup.class),
-                       mock(BroadcastVariableManager.class),
-                       mock(FileCache.class),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       mock(JobManagerTable.class),
-                       mock(JobLeaderService.class),
                        testingFatalErrorHandler);
 
                try {
@@ -1570,23 +1526,20 @@ public class TaskExecutorTest extends TestLogger {
                final TestingLeaderRetrievalService 
resourceManagerLeaderRetriever = new TestingLeaderRetrievalService();
                
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
 
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(localTaskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobLeaderService(jobLeaderService)
+                       .build();
+
                final TaskExecutor taskExecutor = new TaskExecutor(
                        rpc,
                        taskManagerConfiguration,
-                       localTaskManagerLocation,
-                       mock(MemoryManager.class),
-                       mock(IOManager.class),
-                       new TaskExecutorLocalStateStoresManager(),
-                       mock(NetworkEnvironment.class),
                        haServices,
+                       taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       new BroadcastVariableManager(),
-                       mock(FileCache.class),
                        dummyBlobCacheService,
-                       taskSlotTable,
-                       new JobManagerTable(),
-                       jobLeaderService,
                        testingFatalErrorHandler);
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
new file mode 100644
index 0000000..2bd39f8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Builder for the {@link TaskManagerServices}.
+ */
+public class TaskManagerServicesBuilder {
+
+       /** TaskManager services. */
+       private TaskManagerLocation taskManagerLocation;
+       private MemoryManager memoryManager;
+       private IOManager ioManager;
+       private NetworkEnvironment networkEnvironment;
+       private BroadcastVariableManager broadcastVariableManager;
+       private FileCache fileCache;
+       private TaskSlotTable taskSlotTable;
+       private JobManagerTable jobManagerTable;
+       private JobLeaderService jobLeaderService;
+       private TaskExecutorLocalStateStoresManager taskStateManager;
+
+       public TaskManagerServicesBuilder() {
+               taskManagerLocation = new LocalTaskManagerLocation();
+               memoryManager = new MemoryManager(
+                       MemoryManager.MIN_PAGE_SIZE,
+                       1,
+                       MemoryManager.MIN_PAGE_SIZE,
+                       MemoryType.HEAP,
+                       false);
+               ioManager = mock(IOManager.class);
+               networkEnvironment = mock(NetworkEnvironment.class);
+               broadcastVariableManager = new BroadcastVariableManager();
+               fileCache = mock(FileCache.class);
+               taskSlotTable = mock(TaskSlotTable.class);
+               jobManagerTable = new JobManagerTable();
+               jobLeaderService = new JobLeaderService(taskManagerLocation);
+               taskStateManager = new TaskExecutorLocalStateStoresManager();
+
+       }
+
+       public TaskManagerServicesBuilder 
setTaskManagerLocation(TaskManagerLocation taskManagerLocation) {
+               this.taskManagerLocation = taskManagerLocation;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder setMemoryManager(MemoryManager 
memoryManager) {
+               this.memoryManager = memoryManager;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder setIoManager(IOManager ioManager) {
+               this.ioManager = ioManager;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder 
setNetworkEnvironment(NetworkEnvironment networkEnvironment) {
+               this.networkEnvironment = networkEnvironment;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder 
setBroadcastVariableManager(BroadcastVariableManager broadcastVariableManager) {
+               this.broadcastVariableManager = broadcastVariableManager;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder setFileCache(FileCache fileCache) {
+               this.fileCache = fileCache;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder setTaskSlotTable(TaskSlotTable 
taskSlotTable) {
+               this.taskSlotTable = taskSlotTable;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder setJobManagerTable(JobManagerTable 
jobManagerTable) {
+               this.jobManagerTable = jobManagerTable;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder setJobLeaderService(JobLeaderService 
jobLeaderService) {
+               this.jobLeaderService = jobLeaderService;
+               return this;
+       }
+
+       public TaskManagerServicesBuilder 
setTaskStateManager(TaskExecutorLocalStateStoresManager taskStateManager) {
+               this.taskStateManager = taskStateManager;
+               return this;
+       }
+
+       public TaskManagerServices build() {
+               return new TaskManagerServices(
+                       taskManagerLocation,
+                       memoryManager,
+                       ioManager,
+                       networkEnvironment,
+                       broadcastVariableManager,
+                       fileCache,
+                       taskSlotTable,
+                       jobManagerTable,
+                       jobLeaderService,
+                       taskStateManager);
+       }
+}

Reply via email to