zentol commented on a change in pull request #7780: [FLINK-11593][tests] Check 
& port TaskManagerTest to new code base
URL: https://github.com/apache/flink/pull/7780#discussion_r267318175
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ##########
 @@ -1771,7 +2415,349 @@ private TaskExecutor 
createTaskExecutor(TaskManagerServices taskManagerServices)
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        null,
                        dummyBlobCacheService,
-                       testingFatalErrorHandler);
+                       testingFatalErrorHandler
+               );
+       }
+
+       private void runTestAfterTaskSubmission(
+               List<TaskDeploymentDescriptor> tdds,
+               Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction,
+               BiConsumerWithException<TaskExecutor, TaskManagerServices, ?> 
testAfterSubmission)
+               throws Throwable {
+               runTestAfterTaskSubmission(tdds, this.configuration, true, 
updateTaskExecutionStateFunction, null, testAfterSubmission);
+       }
+
+       private void runTestAfterTaskSubmission(
+               List<TaskDeploymentDescriptor> tdds,
+               Configuration configuration,
+               boolean localCommunication,
+               Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction,
+               Function<ResultPartitionID, CompletableFuture<Acknowledge>> 
scheduleOrUpdateConsumersFunction,
+               BiConsumerWithException<TaskExecutor, TaskManagerServices, ?> 
testAfterSubmission)
+               throws Throwable {
+               final JobMasterId jobMasterId = JobMasterId.generate();
+
+               final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
+               
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
+
+               TestTaskManagerActions taskManagerActions = new 
TestTaskManagerActions();
+
+               final JobMasterGateway jobMasterGateway;
+               TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder =
+                       new TestingJobMasterGatewayBuilder()
+                               .setFencingTokenSupplier(() -> jobMasterId);
+               if (scheduleOrUpdateConsumersFunction != null) {
+                       testingJobMasterGatewayBuilder
+                               
.setScheduleOrUpdateConsumersFunction(scheduleOrUpdateConsumersFunction);
+               } else {
+                       testingJobMasterGatewayBuilder
+                               
.setScheduleOrUpdateConsumersFunction(resultPartitionID -> 
CompletableFuture.completedFuture(Acknowledge.get()));
+               }
+               if (updateTaskExecutionStateFunction != null) {
+                       
testingJobMasterGatewayBuilder.setUpdateTaskExecutionStateFunction(updateTaskExecutionStateFunction);
+                       jobMasterGateway = 
testingJobMasterGatewayBuilder.build();
+                       
taskManagerActions.setJobMasterGateway(jobMasterGateway);
+               } else {
+                       jobMasterGateway = 
testingJobMasterGatewayBuilder.build();
+               }
+
+               final PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+               
when(partitionProducerStateChecker.requestPartitionProducerState(any(), any(), 
any()))
+                       
.thenReturn(CompletableFuture.completedFuture(ExecutionState.RUNNING));
+
+               final JobManagerConnection jobManagerConnection = new 
JobManagerConnection(
+                       jobId,
+                       ResourceID.generate(),
+                       jobMasterGateway,
+                       taskManagerActions,
+                       mock(CheckpointResponder.class),
+                       new TestGlobalAggregateManager(),
+                       libraryCacheManager,
+                       new 
RpcResultPartitionConsumableNotifier(jobMasterGateway, rpc.getExecutor(), 
timeout),
+                       partitionProducerStateChecker
+               );
+
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               jobManagerTable.put(jobId, jobManagerConnection);
+
+               Collection<ResourceProfile> resourceProfiles = new 
ArrayList<>();
+               for (int i = 0; i < tdds.size(); i++) {
+                       resourceProfiles.add(ResourceProfile.UNKNOWN);
+               }
+               if (resourceProfiles.size() == 0) {
+                       resourceProfiles.add(ResourceProfile.UNKNOWN);
+               }
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(resourceProfiles, timerService);
+               taskManagerActions.setTaskSlotTable(taskSlotTable);
+
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       false,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
+               final ConnectionManager connectionManager;
+               if (!localCommunication) {
+                       NettyConfig nettyConfig = 
TaskManagerServicesConfiguration
+                               .fromConfiguration(configuration, 
InetAddress.getByName(rpc.getAddress()), localCommunication).getNetworkConfig()
+                               .nettyConfig();
+                       connectionManager = new 
NettyConnectionManager(nettyConfig);
+               } else {
+                       connectionManager = new LocalConnectionManager();
+               }
+
+               final int numAllBuffers = 10;
+               final NetworkEnvironment networkEnvironment = 
createTestNetworkEnvironment(
+                       numAllBuffers,
+                       128,
+                       
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL),
+                       
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX),
+                       2,
+                       8,
+                       true,
+                       connectionManager);
+               networkEnvironment.start();
+
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setNetworkEnvironment(networkEnvironment)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobManagerTable(jobManagerTable)
+                       .setTaskStateManager(localStateStoresManager)
+                       .build();
+
+               TestingTaskExecutor taskManager = 
createTaskExecutor(taskManagerServices, configuration);
+
+               try {
+                       taskManager.start();
+                       taskManager.waitUntilStarted();
+                       for (int i = 0; i < tdds.size(); i++) {
+                               // change back to timeout
+                               taskSlotTable.allocateSlot(i, jobId, 
tdds.get(i).getAllocationId(), Time.seconds(60));
+                               final TaskExecutorGateway tmGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
+                               CompletableFuture<Void> completionFuture = new 
CompletableFuture<>();
+                               
taskManagerActions.putRunningStateFuture(tdds.get(i).getExecutionAttemptId(), 
completionFuture);
+                               tmGateway.submitTask(tdds.get(i), jobMasterId, 
timeout).get();
+                               completionFuture.get();
+                       }
+                       testAfterSubmission.accept(taskManager, 
taskManagerServices);
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskManager, timeout);
+               }
+       }
+
+       @Nonnull
+       private NetworkEnvironment createTestNetworkEnvironment(
+               int numBuffers,
+               int memorySegmentSize,
+               int partitionRequestInitialBackoff,
+               int partitionRequestMaxBackoff,
+               int networkBuffersPerChannel,
+               int extraNetworkBuffersPerGate,
+               boolean enableCreditBased,
+               ConnectionManager connectionManager
+       ) {
+               return new NetworkEnvironment(
+                       new NetworkBufferPool(numBuffers, memorySegmentSize),
+                       connectionManager,
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new KvStateRegistry(),
+                       null,
+                       null,
+                       IOManager.IOMode.SYNC,
+                       partitionRequestInitialBackoff,
+                       partitionRequestMaxBackoff,
+                       networkBuffersPerChannel,
+                       extraNetworkBuffersPerGate,
+                       enableCreditBased);
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               JobVertexID vid,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable
+       ) throws IOException {
+               return createTestTaskDeploymentDescriptor(taskName, vid, eid, 
abstractInvokable, 1);
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               JobVertexID vid,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable,
+               int maxNumberOfSubtasks
+       ) throws IOException {
+               return createTestTaskDeploymentDescriptor(taskName, vid, eid, 
abstractInvokable, maxNumberOfSubtasks, null, null);
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               JobVertexID vid,
 
 Review comment:
   The `JobVertexID` isn't used in any of the tests calling this method and can 
thus be auto-generated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to