zentol commented on a change in pull request #7780: [FLINK-11593][tests] Check
& port TaskManagerTest to new code base
URL: https://github.com/apache/flink/pull/7780#discussion_r267318175
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -1771,7 +2415,349 @@ private TaskExecutor
createTaskExecutor(TaskManagerServices taskManagerServices)
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
- testingFatalErrorHandler);
+ testingFatalErrorHandler
+ );
+ }
+
+ private void runTestAfterTaskSubmission(
+ List<TaskDeploymentDescriptor> tdds,
+ Function<TaskExecutionState, CompletableFuture<Acknowledge>>
updateTaskExecutionStateFunction,
+ BiConsumerWithException<TaskExecutor, TaskManagerServices, ?>
testAfterSubmission)
+ throws Throwable {
+ runTestAfterTaskSubmission(tdds, this.configuration, true,
updateTaskExecutionStateFunction, null, testAfterSubmission);
+ }
+
+ private void runTestAfterTaskSubmission(
+ List<TaskDeploymentDescriptor> tdds,
+ Configuration configuration,
+ boolean localCommunication,
+ Function<TaskExecutionState, CompletableFuture<Acknowledge>>
updateTaskExecutionStateFunction,
+ Function<ResultPartitionID, CompletableFuture<Acknowledge>>
scheduleOrUpdateConsumersFunction,
+ BiConsumerWithException<TaskExecutor, TaskManagerServices, ?>
testAfterSubmission)
+ throws Throwable {
+ final JobMasterId jobMasterId = JobMasterId.generate();
+
+ final LibraryCacheManager libraryCacheManager =
mock(LibraryCacheManager.class);
+
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
+
+ TestTaskManagerActions taskManagerActions = new
TestTaskManagerActions();
+
+ final JobMasterGateway jobMasterGateway;
+ TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder =
+ new TestingJobMasterGatewayBuilder()
+ .setFencingTokenSupplier(() -> jobMasterId);
+ if (scheduleOrUpdateConsumersFunction != null) {
+ testingJobMasterGatewayBuilder
+
.setScheduleOrUpdateConsumersFunction(scheduleOrUpdateConsumersFunction);
+ } else {
+ testingJobMasterGatewayBuilder
+
.setScheduleOrUpdateConsumersFunction(resultPartitionID ->
CompletableFuture.completedFuture(Acknowledge.get()));
+ }
+ if (updateTaskExecutionStateFunction != null) {
+
testingJobMasterGatewayBuilder.setUpdateTaskExecutionStateFunction(updateTaskExecutionStateFunction);
+ jobMasterGateway =
testingJobMasterGatewayBuilder.build();
+
taskManagerActions.setJobMasterGateway(jobMasterGateway);
+ } else {
+ jobMasterGateway =
testingJobMasterGatewayBuilder.build();
+ }
+
+ final PartitionProducerStateChecker
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+
when(partitionProducerStateChecker.requestPartitionProducerState(any(), any(),
any()))
+
.thenReturn(CompletableFuture.completedFuture(ExecutionState.RUNNING));
+
+ final JobManagerConnection jobManagerConnection = new
JobManagerConnection(
+ jobId,
+ ResourceID.generate(),
+ jobMasterGateway,
+ taskManagerActions,
+ mock(CheckpointResponder.class),
+ new TestGlobalAggregateManager(),
+ libraryCacheManager,
+ new
RpcResultPartitionConsumableNotifier(jobMasterGateway, rpc.getExecutor(),
timeout),
+ partitionProducerStateChecker
+ );
+
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ jobManagerTable.put(jobId, jobManagerConnection);
+
+ Collection<ResourceProfile> resourceProfiles = new
ArrayList<>();
+ for (int i = 0; i < tdds.size(); i++) {
+ resourceProfiles.add(ResourceProfile.UNKNOWN);
+ }
+ if (resourceProfiles.size() == 0) {
+ resourceProfiles.add(ResourceProfile.UNKNOWN);
+ }
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(resourceProfiles, timerService);
+ taskManagerActions.setTaskSlotTable(taskSlotTable);
+
+ TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
+ false,
+ new File[]{tmp.newFolder()},
+ Executors.directExecutor());
+
+ final ConnectionManager connectionManager;
+ if (!localCommunication) {
+ NettyConfig nettyConfig =
TaskManagerServicesConfiguration
+ .fromConfiguration(configuration,
InetAddress.getByName(rpc.getAddress()), localCommunication).getNetworkConfig()
+ .nettyConfig();
+ connectionManager = new
NettyConnectionManager(nettyConfig);
+ } else {
+ connectionManager = new LocalConnectionManager();
+ }
+
+ final int numAllBuffers = 10;
+ final NetworkEnvironment networkEnvironment =
createTestNetworkEnvironment(
+ numAllBuffers,
+ 128,
+
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL),
+
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX),
+ 2,
+ 8,
+ true,
+ connectionManager);
+ networkEnvironment.start();
+
+ final TaskManagerServices taskManagerServices = new
TaskManagerServicesBuilder()
+ .setNetworkEnvironment(networkEnvironment)
+ .setTaskSlotTable(taskSlotTable)
+ .setJobManagerTable(jobManagerTable)
+ .setTaskStateManager(localStateStoresManager)
+ .build();
+
+ TestingTaskExecutor taskManager =
createTaskExecutor(taskManagerServices, configuration);
+
+ try {
+ taskManager.start();
+ taskManager.waitUntilStarted();
+ for (int i = 0; i < tdds.size(); i++) {
+ // change back to timeout
+ taskSlotTable.allocateSlot(i, jobId,
tdds.get(i).getAllocationId(), Time.seconds(60));
+ final TaskExecutorGateway tmGateway =
taskManager.getSelfGateway(TaskExecutorGateway.class);
+ CompletableFuture<Void> completionFuture = new
CompletableFuture<>();
+
taskManagerActions.putRunningStateFuture(tdds.get(i).getExecutionAttemptId(),
completionFuture);
+ tmGateway.submitTask(tdds.get(i), jobMasterId,
timeout).get();
+ completionFuture.get();
+ }
+ testAfterSubmission.accept(taskManager,
taskManagerServices);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskManager, timeout);
+ }
+ }
+
+ @Nonnull
+ private NetworkEnvironment createTestNetworkEnvironment(
+ int numBuffers,
+ int memorySegmentSize,
+ int partitionRequestInitialBackoff,
+ int partitionRequestMaxBackoff,
+ int networkBuffersPerChannel,
+ int extraNetworkBuffersPerGate,
+ boolean enableCreditBased,
+ ConnectionManager connectionManager
+ ) {
+ return new NetworkEnvironment(
+ new NetworkBufferPool(numBuffers, memorySegmentSize),
+ connectionManager,
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new KvStateRegistry(),
+ null,
+ null,
+ IOManager.IOMode.SYNC,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ networkBuffersPerChannel,
+ extraNetworkBuffersPerGate,
+ enableCreditBased);
+ }
+
+ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+ String taskName,
+ JobVertexID vid,
+ ExecutionAttemptID eid,
+ Class<? extends AbstractInvokable> abstractInvokable
+ ) throws IOException {
+ return createTestTaskDeploymentDescriptor(taskName, vid, eid,
abstractInvokable, 1);
+ }
+
+ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+ String taskName,
+ JobVertexID vid,
+ ExecutionAttemptID eid,
+ Class<? extends AbstractInvokable> abstractInvokable,
+ int maxNumberOfSubtasks
+ ) throws IOException {
+ return createTestTaskDeploymentDescriptor(taskName, vid, eid,
abstractInvokable, maxNumberOfSubtasks, null, null);
+ }
+
+ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+ String taskName,
+ JobVertexID vid,
Review comment:
The `JobVertexID` isn't used in any of the tests calling this method and can
thus be auto-generated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services