zentol commented on a change in pull request #7780: [FLINK-11593][tests] Check 
& port TaskManagerTest to new code base
URL: https://github.com/apache/flink/pull/7780#discussion_r267330448
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ##########
 @@ -1771,7 +2415,349 @@ private TaskExecutor 
createTaskExecutor(TaskManagerServices taskManagerServices)
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        null,
                        dummyBlobCacheService,
-                       testingFatalErrorHandler);
+                       testingFatalErrorHandler
+               );
+       }
+
+       private void runTestAfterTaskSubmission(
+               List<TaskDeploymentDescriptor> tdds,
+               Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction,
+               BiConsumerWithException<TaskExecutor, TaskManagerServices, ?> 
testAfterSubmission)
+               throws Throwable {
+               runTestAfterTaskSubmission(tdds, this.configuration, true, 
updateTaskExecutionStateFunction, null, testAfterSubmission);
+       }
+
+       private void runTestAfterTaskSubmission(
+               List<TaskDeploymentDescriptor> tdds,
+               Configuration configuration,
+               boolean localCommunication,
+               Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction,
+               Function<ResultPartitionID, CompletableFuture<Acknowledge>> 
scheduleOrUpdateConsumersFunction,
+               BiConsumerWithException<TaskExecutor, TaskManagerServices, ?> 
testAfterSubmission)
+               throws Throwable {
+               final JobMasterId jobMasterId = JobMasterId.generate();
+
+               final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
+               
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
+
+               TestTaskManagerActions taskManagerActions = new 
TestTaskManagerActions();
+
+               final JobMasterGateway jobMasterGateway;
+               TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder =
+                       new TestingJobMasterGatewayBuilder()
+                               .setFencingTokenSupplier(() -> jobMasterId);
+               if (scheduleOrUpdateConsumersFunction != null) {
+                       testingJobMasterGatewayBuilder
+                               
.setScheduleOrUpdateConsumersFunction(scheduleOrUpdateConsumersFunction);
+               } else {
+                       testingJobMasterGatewayBuilder
+                               
.setScheduleOrUpdateConsumersFunction(resultPartitionID -> 
CompletableFuture.completedFuture(Acknowledge.get()));
+               }
+               if (updateTaskExecutionStateFunction != null) {
+                       
testingJobMasterGatewayBuilder.setUpdateTaskExecutionStateFunction(updateTaskExecutionStateFunction);
+                       jobMasterGateway = 
testingJobMasterGatewayBuilder.build();
+                       
taskManagerActions.setJobMasterGateway(jobMasterGateway);
+               } else {
+                       jobMasterGateway = 
testingJobMasterGatewayBuilder.build();
+               }
+
+               final PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+               
when(partitionProducerStateChecker.requestPartitionProducerState(any(), any(), 
any()))
+                       
.thenReturn(CompletableFuture.completedFuture(ExecutionState.RUNNING));
+
+               final JobManagerConnection jobManagerConnection = new 
JobManagerConnection(
+                       jobId,
+                       ResourceID.generate(),
+                       jobMasterGateway,
+                       taskManagerActions,
+                       mock(CheckpointResponder.class),
+                       new TestGlobalAggregateManager(),
+                       libraryCacheManager,
+                       new 
RpcResultPartitionConsumableNotifier(jobMasterGateway, rpc.getExecutor(), 
timeout),
+                       partitionProducerStateChecker
+               );
+
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               jobManagerTable.put(jobId, jobManagerConnection);
+
+               Collection<ResourceProfile> resourceProfiles = new 
ArrayList<>();
+               for (int i = 0; i < tdds.size(); i++) {
+                       resourceProfiles.add(ResourceProfile.UNKNOWN);
+               }
+               if (resourceProfiles.size() == 0) {
+                       resourceProfiles.add(ResourceProfile.UNKNOWN);
+               }
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(resourceProfiles, timerService);
+               taskManagerActions.setTaskSlotTable(taskSlotTable);
+
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       false,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
+               final ConnectionManager connectionManager;
+               if (!localCommunication) {
+                       NettyConfig nettyConfig = 
TaskManagerServicesConfiguration
+                               .fromConfiguration(configuration, 
InetAddress.getByName(rpc.getAddress()), localCommunication).getNetworkConfig()
+                               .nettyConfig();
+                       connectionManager = new 
NettyConnectionManager(nettyConfig);
+               } else {
+                       connectionManager = new LocalConnectionManager();
+               }
+
+               final int numAllBuffers = 10;
+               final NetworkEnvironment networkEnvironment = 
createTestNetworkEnvironment(
+                       numAllBuffers,
+                       128,
+                       
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL),
+                       
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX),
+                       2,
+                       8,
+                       true,
+                       connectionManager);
+               networkEnvironment.start();
+
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setNetworkEnvironment(networkEnvironment)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobManagerTable(jobManagerTable)
+                       .setTaskStateManager(localStateStoresManager)
+                       .build();
+
+               TestingTaskExecutor taskManager = 
createTaskExecutor(taskManagerServices, configuration);
+
+               try {
+                       taskManager.start();
+                       taskManager.waitUntilStarted();
+                       for (int i = 0; i < tdds.size(); i++) {
+                               // change back to timeout
+                               taskSlotTable.allocateSlot(i, jobId, 
tdds.get(i).getAllocationId(), Time.seconds(60));
+                               final TaskExecutorGateway tmGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
+                               CompletableFuture<Void> completionFuture = new 
CompletableFuture<>();
+                               
taskManagerActions.putRunningStateFuture(tdds.get(i).getExecutionAttemptId(), 
completionFuture);
+                               tmGateway.submitTask(tdds.get(i), jobMasterId, 
timeout).get();
+                               completionFuture.get();
+                       }
+                       testAfterSubmission.accept(taskManager, 
taskManagerServices);
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskManager, timeout);
+               }
+       }
+
+       @Nonnull
+       private NetworkEnvironment createTestNetworkEnvironment(
 
 Review comment:
   unused

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to