[FLINK-8636] [flip6] Use TaskManagerServices to pass in services to TaskExecutor
Pass in the TaskExecutor services via the TaskManagerServices instead of individually. This closes #5456. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c8851a3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c8851a3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c8851a3 Branch: refs/heads/master Commit: 4c8851a3aa8d4154e88d8a19c93535ee9ac4a6a3 Parents: f2ae241 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Feb 11 23:18:06 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Feb 12 11:59:29 2018 +0100 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 98 +++---- .../runtime/taskexecutor/TaskManagerRunner.java | 27 +- .../taskexecutor/TaskManagerServices.java | 59 ++++- .../taskexecutor/TaskExecutorITCase.java | 47 +--- .../runtime/taskexecutor/TaskExecutorTest.java | 261 ++++++++----------- .../TaskManagerServicesBuilder.java | 133 ++++++++++ 6 files changed, 358 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 0eaf67e..d880407 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.blob.TransientBlobKey; -import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -41,14 +40,12 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.heartbeat.HeartbeatListener; import org.apache.flink.runtime.heartbeat.HeartbeatManager; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.HardwareDescription; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; @@ -59,7 +56,6 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -132,27 +128,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public static final String TASK_MANAGER_NAME = "taskmanager"; - /** The connection information of this task manager. */ - private final TaskManagerLocation taskManagerLocation; - /** The access to the leader election and retrieval services. */ private final HighAvailabilityServices haServices; + private final TaskManagerServices taskExecutorServices; + /** The task manager configuration. */ private final TaskManagerConfiguration taskManagerConfiguration; - /** The I/O manager component in the task manager. */ - private final IOManager ioManager; - - /** The memory manager component in the task manager. */ - private final MemoryManager memoryManager; - - /** The state manager for this task, providing state managers per slot. */ - private final TaskExecutorLocalStateStoresManager localStateStoresManager; - - /** The network component in the task manager. */ - private final NetworkEnvironment networkEnvironment; - /** The heartbeat manager for job manager in the task manager. */ private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; @@ -162,13 +145,20 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { /** The fatal error handler to use in case of a fatal error. */ private final FatalErrorHandler fatalErrorHandler; - private final TaskManagerMetricGroup taskManagerMetricGroup; + private final BlobCacheService blobCacheService; - private final BroadcastVariableManager broadcastVariableManager; + // --------- TaskManager services -------- - private final FileCache fileCache; + /** The connection information of this task manager. */ + private final TaskManagerLocation taskManagerLocation; - private final BlobCacheService blobCacheService; + private final TaskManagerMetricGroup taskManagerMetricGroup; + + /** The state manager for this task, providing state managers per slot. */ + private final TaskExecutorLocalStateStoresManager localStateStoresManager; + + /** The network component in the task manager. */ + private final NetworkEnvironment networkEnvironment; // --------- resource manager -------- @@ -193,20 +183,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public TaskExecutor( RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, - TaskManagerLocation taskManagerLocation, - MemoryManager memoryManager, - IOManager ioManager, - TaskExecutorLocalStateStoresManager localStateStoresManager, - NetworkEnvironment networkEnvironment, HighAvailabilityServices haServices, + TaskManagerServices taskExecutorServices, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, - BroadcastVariableManager broadcastVariableManager, - FileCache fileCache, BlobCacheService blobCacheService, - TaskSlotTable taskSlotTable, - JobManagerTable jobManagerTable, - JobLeaderService jobLeaderService, FatalErrorHandler fatalErrorHandler) { super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME)); @@ -214,36 +195,37 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0."); this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration); - this.taskManagerLocation = checkNotNull(taskManagerLocation); - this.memoryManager = checkNotNull(memoryManager); - this.localStateStoresManager = checkNotNull(localStateStoresManager); - this.ioManager = checkNotNull(ioManager); - this.networkEnvironment = checkNotNull(networkEnvironment); + this.taskExecutorServices = checkNotNull(taskExecutorServices); this.haServices = checkNotNull(haServices); - this.taskSlotTable = checkNotNull(taskSlotTable); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); - this.broadcastVariableManager = checkNotNull(broadcastVariableManager); - this.fileCache = checkNotNull(fileCache); this.blobCacheService = checkNotNull(blobCacheService); - this.jobManagerTable = checkNotNull(jobManagerTable); - this.jobLeaderService = checkNotNull(jobLeaderService); + + this.taskSlotTable = taskExecutorServices.getTaskSlotTable(); + this.jobManagerTable = taskExecutorServices.getJobManagerTable(); + this.jobLeaderService = taskExecutorServices.getJobLeaderService(); + this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation(); + this.localStateStoresManager = taskExecutorServices.getTaskStateManager(); + this.networkEnvironment = taskExecutorServices.getNetworkEnvironment(); this.jobManagerConnections = new HashMap<>(4); + final ResourceID resourceId = taskExecutorServices.getTaskManagerLocation().getResourceID(); + this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( - getResourceID(), + resourceId, new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), log); this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( - getResourceID(), + resourceId, new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), log); - hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize()); + hardwareDescription = HardwareDescription.extractFromSystem( + taskExecutorServices.getMemoryManager().getMemorySize()); } // ------------------------------------------------------------------------ @@ -277,8 +259,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { Throwable throwable = null; - taskSlotTable.stop(); - if (isConnectedToResourceManager()) { resourceManagerConnection.close(); } @@ -295,13 +275,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { resourceManagerHeartbeatManager.stop(); - ioManager.shutdown(); - - memoryManager.shutdown(); - - networkEnvironment.shutdown(); - - fileCache.shutdown(); + try { + taskExecutorServices.shutDown(); + } catch (Throwable t) { + throwable = ExceptionUtils.firstOrSuppressed(t, throwable); + } try { super.postStop(); @@ -504,17 +482,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), - memoryManager, - ioManager, - networkEnvironment, - broadcastVariableManager, + taskExecutorServices.getMemoryManager(), + taskExecutorServices.getIOManager(), + taskExecutorServices.getNetworkEnvironment(), + taskExecutorServices.getBroadcastVariableManager(), taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, blobCacheService, libraryCache, - fileCache, + taskExecutorServices.getFileCache(), taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 333ce12..3bf88d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -48,7 +48,6 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; -import org.apache.flink.util.Preconditions; import akka.actor.ActorSystem; import org.slf4j.Logger; @@ -61,6 +60,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * This class is the executable entry point for the task manager in yarn or standalone mode. @@ -97,8 +97,8 @@ public class TaskManagerRunner implements FatalErrorHandler { private final TaskExecutor taskManager; public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception { - this.configuration = Preconditions.checkNotNull(configuration); - this.resourceId = Preconditions.checkNotNull(resourceId); + this.configuration = checkNotNull(configuration); + this.resourceId = checkNotNull(resourceId); timeout = AkkaUtils.getTimeoutAsTime(configuration); @@ -270,10 +270,10 @@ public class TaskManagerRunner implements FatalErrorHandler { boolean localCommunicationOnly, FatalErrorHandler fatalErrorHandler) throws Exception { - Preconditions.checkNotNull(configuration); - Preconditions.checkNotNull(resourceID); - Preconditions.checkNotNull(rpcService); - Preconditions.checkNotNull(highAvailabilityServices); + checkNotNull(configuration); + checkNotNull(resourceID); + checkNotNull(rpcService); + checkNotNull(highAvailabilityServices); InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); @@ -297,20 +297,11 @@ public class TaskManagerRunner implements FatalErrorHandler { return new TaskExecutor( rpcService, taskManagerConfiguration, - taskManagerServices.getTaskManagerLocation(), - taskManagerServices.getMemoryManager(), - taskManagerServices.getIOManager(), - taskManagerServices.getTaskStateManager(), - taskManagerServices.getNetworkEnvironment(), highAvailabilityServices, + taskManagerServices, heartbeatServices, taskManagerMetricGroup, - taskManagerServices.getBroadcastVariableManager(), - taskManagerServices.getFileCache(), blobCacheService, - taskManagerServices.getTaskSlotTable(), - taskManagerServices.getJobManagerTable(), - taskManagerServices.getJobLeaderService(), fatalErrorHandler); } @@ -346,7 +337,7 @@ public class TaskManagerRunner implements FatalErrorHandler { final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); - Preconditions.checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + + checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 29984e9..0879481 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -47,6 +47,8 @@ import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -60,7 +62,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; /** * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager}, - * {@link NetworkEnvironment}. + * {@link NetworkEnvironment}. All services are exclusive to a single {@link TaskExecutor}. + * Consequently, the respective {@link TaskExecutor} is responsible for closing them. */ public class TaskManagerServices { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class); @@ -77,7 +80,7 @@ public class TaskManagerServices { private final JobLeaderService jobLeaderService; private final TaskExecutorLocalStateStoresManager taskStateManager; - private TaskManagerServices( + TaskManagerServices( TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, @@ -146,6 +149,58 @@ public class TaskManagerServices { } // -------------------------------------------------------------------------------------------- + // Shut down method + // -------------------------------------------------------------------------------------------- + + /** + * Shuts the {@link TaskExecutor} services down. + */ + public void shutDown() throws FlinkException { + + Exception exception = null; + + try { + memoryManager.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + ioManager.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + networkEnvironment.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + fileCache.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + taskSlotTable.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + jobLeaderService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the TaskManager services.", exception); + } + } + + // -------------------------------------------------------------------------------------------- // Static factory methods for task manager services // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index c5d7c24..dc1d09f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -22,28 +22,24 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobCacheService; -import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.entrypoint.ClusterInformation; -import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.MockNetworkEnvironment; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; -import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; @@ -53,7 +49,6 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -122,29 +117,19 @@ public class TaskExecutorITCase extends TestLogger { testingHAServices, rpcService.getScheduledExecutor(), Time.minutes(5L)); - MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); - HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS); + MetricRegistry metricRegistry = NoOpMetricRegistry.INSTANCE; + HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234); - final MemoryManager memoryManager = mock(MemoryManager.class); - final IOManager ioManager = mock(IOManager.class); - final NetworkEnvironment networkEnvironment = MockNetworkEnvironment.getMock(); - final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class); - final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class); - final FileCache fileCache = mock(FileCache.class); final List<ResourceProfile> resourceProfiles = Arrays.asList(resourceProfile); final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, new TimerService<AllocationID>(scheduledExecutorService, 100L)); - final JobManagerTable jobManagerTable = new JobManagerTable(); - final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); final SlotManager slotManager = new SlotManager( rpcService.getScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); - final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(); - ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager( rpcService, FlinkResourceManager.RESOURCE_MANAGER_NAME, @@ -158,26 +143,22 @@ public class TaskExecutorITCase extends TestLogger { new ClusterInformation("localhost", 1234), testingFatalErrorHandler); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .build(); + TaskExecutor taskExecutor = new TaskExecutor( rpcService, taskManagerConfiguration, - taskManagerLocation, - memoryManager, - ioManager, - taskStateManager, - networkEnvironment, testingHAServices, + taskManagerServices, heartbeatServices, - taskManagerMetricGroup, - broadcastVariableManager, - fileCache, + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), new BlobCacheService( configuration, - testingHAServices.createBlobStore(), + new VoidBlobStore(), null), - taskSlotTable, - jobManagerTable, - jobLeaderService, testingFatalErrorHandler); JobMasterGateway jmGateway = mock(JobMasterGateway.class); http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 0e12c54..cbd7965 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.VoidBlobStore; -import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -39,7 +38,6 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.heartbeat.HeartbeatListener; import org.apache.flink.runtime.heartbeat.HeartbeatManager; import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl; @@ -50,7 +48,6 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; @@ -63,7 +60,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -76,7 +72,6 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -242,23 +237,20 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress); when(jobMasterGateway.getHostname()).thenReturn("localhost"); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .setJobLeaderService(jobLeaderService) + .build(); + final TaskExecutor taskManager = new TaskExecutor( rpc, tmConfig, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, + taskManagerServices, heartbeatServices, - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - new JobManagerTable(), - jobLeaderService, testingFatalErrorHandler); try { @@ -350,23 +342,19 @@ public class TaskExecutorTest extends TestLogger { } ); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .build(); + final TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, + taskManagerServices, heartbeatServices, - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - mock(JobManagerTable.class), - mock(JobLeaderService.class), testingFatalErrorHandler); try { @@ -470,23 +458,19 @@ public class TaskExecutorTest extends TestLogger { } ); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .build(); + final TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, + taskManagerServices, heartbeatServices, - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - mock(JobManagerTable.class), - mock(JobLeaderService.class), testingFatalErrorHandler); try { @@ -566,23 +550,19 @@ public class TaskExecutorTest extends TestLogger { final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .build(); + TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerServicesConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, - mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + taskManagerServices, + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - mock(JobManagerTable.class), - mock(JobLeaderService.class), testingFatalErrorHandler); try { @@ -650,23 +630,19 @@ public class TaskExecutorTest extends TestLogger { final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .build(); + TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerServicesConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, - mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + taskManagerServices, + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - mock(JobManagerTable.class), - mock(JobLeaderService.class), testingFatalErrorHandler); try { @@ -791,23 +767,20 @@ public class TaskExecutorTest extends TestLogger { final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setNetworkEnvironment(networkEnvironment) + .setTaskSlotTable(taskSlotTable) + .setJobManagerTable(jobManagerTable) + .build(); + TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerConfiguration, - mock(TaskManagerLocation.class), - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - networkEnvironment, haServices, - mock(HeartbeatServices.class, RETURNS_MOCKS), + taskManagerServices, + new HeartbeatServices(1000L, 1000L), taskManagerMetricGroup, - mock(BroadcastVariableManager.class), - mock(FileCache.class), dummyBlobCacheService, - taskSlotTable, - jobManagerTable, - mock(JobLeaderService.class), testingFatalErrorHandler); try { @@ -912,23 +885,21 @@ public class TaskExecutorTest extends TestLogger { final SlotID slotId = new SlotID(resourceId, 0); final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .setJobManagerTable(jobManagerTable) + .setJobLeaderService(jobLeaderService) + .build(); + TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, - mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + taskManagerServices, + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - jobManagerTable, - jobLeaderService, testingFatalErrorHandler); try { @@ -1032,23 +1003,21 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .setJobManagerTable(jobManagerTable) + .setJobLeaderService(jobLeaderService) + .build(); + TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, - mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + taskManagerServices, + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - jobManagerTable, - jobLeaderService, testingFatalErrorHandler); try { @@ -1124,27 +1093,22 @@ public class TaskExecutorTest extends TestLogger { final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new SlotReport()); when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new AllocationID()); + when(rmGateway1.registerTaskExecutor(anyString(), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class))).thenReturn( + CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L, new ClusterInformation("localhost", 1234)))); - when(rmGateway1.registerTaskExecutor(anyString(), eq(resourceID), any(SlotReport.class), anyInt(), any(HardwareDescription.class), any(Time.class))).thenReturn( - CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L, new ClusterInformation("localhost", 1234)))); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .build(); TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerServicesConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, - mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + taskManagerServices, + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - mock(JobManagerTable.class), - mock(JobLeaderService.class), testingFatalErrorHandler); try { @@ -1300,23 +1264,22 @@ public class TaskExecutorTest extends TestLogger { final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setNetworkEnvironment(networkMock) + .setTaskSlotTable(taskSlotTable) + .setJobLeaderService(jobLeaderService) + .setJobManagerTable(jobManagerTable) + .build(); + final TaskExecutor taskManager = new TaskExecutor( rpc, taskManagerConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - networkMock, haServices, - mock(HeartbeatServices.class, RETURNS_MOCKS), + taskManagerServices, + new HeartbeatServices(1000L, 1000L), taskManagerMetricGroup, - mock(BroadcastVariableManager.class), - mock(FileCache.class), dummyBlobCacheService, - taskSlotTable, - jobManagerTable, - jobLeaderService, testingFatalErrorHandler); try { @@ -1424,23 +1387,20 @@ public class TaskExecutorTest extends TestLogger { final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate()); final JobManagerTable jobManagerTableMock = spy(new JobManagerTable()); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setJobManagerTable(jobManagerTableMock) + .setJobLeaderService(jobLeaderService) + .build(); + final TaskExecutor taskExecutor = new TaskExecutor( rpc, taskManagerConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServicesMock, + taskManagerServices, heartbeatServicesMock, - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - mock(TaskSlotTable.class), - jobManagerTableMock, - jobLeaderService, testingFatalErrorHandler); try { @@ -1504,23 +1464,19 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(rmAddress, rmGateway); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(taskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .build(); + final TaskExecutor taskExecutor = new TaskExecutor( rpc, taskManagerConfiguration, - taskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, + taskManagerServices, heartbeatServices, - mock(TaskManagerMetricGroup.class), - mock(BroadcastVariableManager.class), - mock(FileCache.class), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), dummyBlobCacheService, - taskSlotTable, - mock(JobManagerTable.class), - mock(JobLeaderService.class), testingFatalErrorHandler); try { @@ -1570,23 +1526,20 @@ public class TaskExecutorTest extends TestLogger { final TestingLeaderRetrievalService resourceManagerLeaderRetriever = new TestingLeaderRetrievalService(); haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskManagerLocation(localTaskManagerLocation) + .setTaskSlotTable(taskSlotTable) + .setJobLeaderService(jobLeaderService) + .build(); + final TaskExecutor taskExecutor = new TaskExecutor( rpc, taskManagerConfiguration, - localTaskManagerLocation, - mock(MemoryManager.class), - mock(IOManager.class), - new TaskExecutorLocalStateStoresManager(), - mock(NetworkEnvironment.class), haServices, + taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - new BroadcastVariableManager(), - mock(FileCache.class), dummyBlobCacheService, - taskSlotTable, - new JobManagerTable(), - jobLeaderService, testingFatalErrorHandler); try { http://git-wip-us.apache.org/repos/asf/flink/blob/4c8851a3/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java new file mode 100644 index 0000000..2bd39f8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import static org.mockito.Mockito.mock; + +/** + * Builder for the {@link TaskManagerServices}. + */ +public class TaskManagerServicesBuilder { + + /** TaskManager services. */ + private TaskManagerLocation taskManagerLocation; + private MemoryManager memoryManager; + private IOManager ioManager; + private NetworkEnvironment networkEnvironment; + private BroadcastVariableManager broadcastVariableManager; + private FileCache fileCache; + private TaskSlotTable taskSlotTable; + private JobManagerTable jobManagerTable; + private JobLeaderService jobLeaderService; + private TaskExecutorLocalStateStoresManager taskStateManager; + + public TaskManagerServicesBuilder() { + taskManagerLocation = new LocalTaskManagerLocation(); + memoryManager = new MemoryManager( + MemoryManager.MIN_PAGE_SIZE, + 1, + MemoryManager.MIN_PAGE_SIZE, + MemoryType.HEAP, + false); + ioManager = mock(IOManager.class); + networkEnvironment = mock(NetworkEnvironment.class); + broadcastVariableManager = new BroadcastVariableManager(); + fileCache = mock(FileCache.class); + taskSlotTable = mock(TaskSlotTable.class); + jobManagerTable = new JobManagerTable(); + jobLeaderService = new JobLeaderService(taskManagerLocation); + taskStateManager = new TaskExecutorLocalStateStoresManager(); + + } + + public TaskManagerServicesBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) { + this.taskManagerLocation = taskManagerLocation; + return this; + } + + public TaskManagerServicesBuilder setMemoryManager(MemoryManager memoryManager) { + this.memoryManager = memoryManager; + return this; + } + + public TaskManagerServicesBuilder setIoManager(IOManager ioManager) { + this.ioManager = ioManager; + return this; + } + + public TaskManagerServicesBuilder setNetworkEnvironment(NetworkEnvironment networkEnvironment) { + this.networkEnvironment = networkEnvironment; + return this; + } + + public TaskManagerServicesBuilder setBroadcastVariableManager(BroadcastVariableManager broadcastVariableManager) { + this.broadcastVariableManager = broadcastVariableManager; + return this; + } + + public TaskManagerServicesBuilder setFileCache(FileCache fileCache) { + this.fileCache = fileCache; + return this; + } + + public TaskManagerServicesBuilder setTaskSlotTable(TaskSlotTable taskSlotTable) { + this.taskSlotTable = taskSlotTable; + return this; + } + + public TaskManagerServicesBuilder setJobManagerTable(JobManagerTable jobManagerTable) { + this.jobManagerTable = jobManagerTable; + return this; + } + + public TaskManagerServicesBuilder setJobLeaderService(JobLeaderService jobLeaderService) { + this.jobLeaderService = jobLeaderService; + return this; + } + + public TaskManagerServicesBuilder setTaskStateManager(TaskExecutorLocalStateStoresManager taskStateManager) { + this.taskStateManager = taskStateManager; + return this; + } + + public TaskManagerServices build() { + return new TaskManagerServices( + taskManagerLocation, + memoryManager, + ioManager, + networkEnvironment, + broadcastVariableManager, + fileCache, + taskSlotTable, + jobManagerTable, + jobLeaderService, + taskStateManager); + } +}