Clarkkkkk commented on a change in pull request #7780: [FLINK-11593][tests] 
Check & port TaskManagerTest to new code base
URL: https://github.com/apache/flink/pull/7780#discussion_r268019967
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ##########
 @@ -1771,7 +2415,349 @@ private TaskExecutor 
createTaskExecutor(TaskManagerServices taskManagerServices)
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
                        null,
                        dummyBlobCacheService,
-                       testingFatalErrorHandler);
+                       testingFatalErrorHandler
+               );
+       }
+
+       private void runTestAfterTaskSubmission(
+               List<TaskDeploymentDescriptor> tdds,
+               Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction,
+               BiConsumerWithException<TaskExecutor, TaskManagerServices, ?> 
testAfterSubmission)
+               throws Throwable {
+               runTestAfterTaskSubmission(tdds, this.configuration, true, 
updateTaskExecutionStateFunction, null, testAfterSubmission);
+       }
+
+       private void runTestAfterTaskSubmission(
+               List<TaskDeploymentDescriptor> tdds,
+               Configuration configuration,
+               boolean localCommunication,
+               Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction,
+               Function<ResultPartitionID, CompletableFuture<Acknowledge>> 
scheduleOrUpdateConsumersFunction,
+               BiConsumerWithException<TaskExecutor, TaskManagerServices, ?> 
testAfterSubmission)
+               throws Throwable {
+               final JobMasterId jobMasterId = JobMasterId.generate();
+
+               final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
+               
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
+
+               TestTaskManagerActions taskManagerActions = new 
TestTaskManagerActions();
+
+               final JobMasterGateway jobMasterGateway;
+               TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder =
+                       new TestingJobMasterGatewayBuilder()
+                               .setFencingTokenSupplier(() -> jobMasterId);
+               if (scheduleOrUpdateConsumersFunction != null) {
+                       testingJobMasterGatewayBuilder
+                               
.setScheduleOrUpdateConsumersFunction(scheduleOrUpdateConsumersFunction);
+               } else {
+                       testingJobMasterGatewayBuilder
+                               
.setScheduleOrUpdateConsumersFunction(resultPartitionID -> 
CompletableFuture.completedFuture(Acknowledge.get()));
+               }
+               if (updateTaskExecutionStateFunction != null) {
+                       
testingJobMasterGatewayBuilder.setUpdateTaskExecutionStateFunction(updateTaskExecutionStateFunction);
+                       jobMasterGateway = 
testingJobMasterGatewayBuilder.build();
+                       
taskManagerActions.setJobMasterGateway(jobMasterGateway);
+               } else {
+                       jobMasterGateway = 
testingJobMasterGatewayBuilder.build();
+               }
+
+               final PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+               
when(partitionProducerStateChecker.requestPartitionProducerState(any(), any(), 
any()))
+                       
.thenReturn(CompletableFuture.completedFuture(ExecutionState.RUNNING));
+
+               final JobManagerConnection jobManagerConnection = new 
JobManagerConnection(
+                       jobId,
+                       ResourceID.generate(),
+                       jobMasterGateway,
+                       taskManagerActions,
+                       mock(CheckpointResponder.class),
+                       new TestGlobalAggregateManager(),
+                       libraryCacheManager,
+                       new 
RpcResultPartitionConsumableNotifier(jobMasterGateway, rpc.getExecutor(), 
timeout),
+                       partitionProducerStateChecker
+               );
+
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               jobManagerTable.put(jobId, jobManagerConnection);
+
+               Collection<ResourceProfile> resourceProfiles = new 
ArrayList<>();
+               for (int i = 0; i < tdds.size(); i++) {
+                       resourceProfiles.add(ResourceProfile.UNKNOWN);
+               }
+               if (resourceProfiles.size() == 0) {
+                       resourceProfiles.add(ResourceProfile.UNKNOWN);
+               }
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(resourceProfiles, timerService);
+               taskManagerActions.setTaskSlotTable(taskSlotTable);
+
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       false,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
+               final ConnectionManager connectionManager;
+               if (!localCommunication) {
+                       NettyConfig nettyConfig = 
TaskManagerServicesConfiguration
+                               .fromConfiguration(configuration, 
InetAddress.getByName(rpc.getAddress()), localCommunication).getNetworkConfig()
+                               .nettyConfig();
+                       connectionManager = new 
NettyConnectionManager(nettyConfig);
+               } else {
+                       connectionManager = new LocalConnectionManager();
+               }
+
+               final int numAllBuffers = 10;
+               final NetworkEnvironment networkEnvironment = 
createTestNetworkEnvironment(
+                       numAllBuffers,
+                       128,
+                       
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL),
+                       
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX),
+                       2,
+                       8,
+                       true,
+                       connectionManager);
+               networkEnvironment.start();
+
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setNetworkEnvironment(networkEnvironment)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setJobManagerTable(jobManagerTable)
+                       .setTaskStateManager(localStateStoresManager)
+                       .build();
+
+               TestingTaskExecutor taskManager = 
createTaskExecutor(taskManagerServices, configuration);
+
+               try {
+                       taskManager.start();
+                       taskManager.waitUntilStarted();
+                       for (int i = 0; i < tdds.size(); i++) {
+                               // change back to timeout
+                               taskSlotTable.allocateSlot(i, jobId, 
tdds.get(i).getAllocationId(), Time.seconds(60));
+                               final TaskExecutorGateway tmGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
+                               CompletableFuture<Void> completionFuture = new 
CompletableFuture<>();
+                               
taskManagerActions.putRunningStateFuture(tdds.get(i).getExecutionAttemptId(), 
completionFuture);
+                               tmGateway.submitTask(tdds.get(i), jobMasterId, 
timeout).get();
+                               completionFuture.get();
+                       }
+                       testAfterSubmission.accept(taskManager, 
taskManagerServices);
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskManager, timeout);
+               }
+       }
+
+       @Nonnull
+       private NetworkEnvironment createTestNetworkEnvironment(
+               int numBuffers,
+               int memorySegmentSize,
+               int partitionRequestInitialBackoff,
+               int partitionRequestMaxBackoff,
+               int networkBuffersPerChannel,
+               int extraNetworkBuffersPerGate,
+               boolean enableCreditBased,
+               ConnectionManager connectionManager
+       ) {
+               return new NetworkEnvironment(
+                       new NetworkBufferPool(numBuffers, memorySegmentSize),
+                       connectionManager,
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new KvStateRegistry(),
+                       null,
+                       null,
+                       IOManager.IOMode.SYNC,
+                       partitionRequestInitialBackoff,
+                       partitionRequestMaxBackoff,
+                       networkBuffersPerChannel,
+                       extraNetworkBuffersPerGate,
+                       enableCreditBased);
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               JobVertexID vid,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable
+       ) throws IOException {
+               return createTestTaskDeploymentDescriptor(taskName, vid, eid, 
abstractInvokable, 1);
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               JobVertexID vid,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable,
+               int maxNumberOfSubtasks
+       ) throws IOException {
+               return createTestTaskDeploymentDescriptor(taskName, vid, eid, 
abstractInvokable, maxNumberOfSubtasks, null, null);
+       }
+
+       private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+               String taskName,
+               JobVertexID vid,
+               ExecutionAttemptID eid,
+               Class<? extends AbstractInvokable> abstractInvokable,
+               int maxNumberOfSubtasks,
+               Collection<ResultPartitionDeploymentDescriptor> 
producedPartitions,
+               Collection<InputGateDeploymentDescriptor> inputGates
+       ) throws IOException {
+               if (producedPartitions == null) {
+                       producedPartitions = Collections.emptyList();
+               }
+               if (inputGates == null) {
+                       inputGates = Collections.emptyList();
+               }
+               return createTaskDeploymentDescriptor(
+                       jobId, testName.getMethodName(), vid, eid,
+                       new SerializedValue<>(new ExecutionConfig()), taskName, 
maxNumberOfSubtasks, 0, 1, 0,
+                       new Configuration(), new Configuration(), 
abstractInvokable.getName(),
+                       producedPartitions,
+                       inputGates,
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       0);
+       }
+
+       private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+               JobID jobId,
+               String jobName,
+               JobVertexID jobVertexId,
+               ExecutionAttemptID executionAttemptId,
+               SerializedValue<ExecutionConfig> serializedExecutionConfig,
+               String taskName,
+               int maxNumberOfSubtasks,
+               int subtaskIndex,
+               int numberOfSubtasks,
+               int attemptNumber,
+               Configuration jobConfiguration,
+               Configuration taskConfiguration,
+               String invokableClassName,
+               Collection<ResultPartitionDeploymentDescriptor> 
producedPartitions,
+               Collection<InputGateDeploymentDescriptor> inputGates,
+               Collection<PermanentBlobKey> requiredJarFiles,
+               Collection<URL> requiredClasspaths,
+               int targetSlotNumber) throws IOException {
+
+               JobInformation jobInformation = new JobInformation(
+                       jobId,
+                       jobName,
+                       serializedExecutionConfig,
+                       jobConfiguration,
+                       requiredJarFiles,
+                       requiredClasspaths);
+
+               TaskInformation taskInformation = new TaskInformation(
+                       jobVertexId,
+                       taskName,
+                       numberOfSubtasks,
+                       maxNumberOfSubtasks,
+                       invokableClassName,
+                       taskConfiguration);
+
+               SerializedValue<JobInformation> serializedJobInformation = new 
SerializedValue<>(jobInformation);
+               SerializedValue<TaskInformation> serializedJobVertexInformation 
= new SerializedValue<>(taskInformation);
+
+               return new TaskDeploymentDescriptor(
+                       jobId,
+                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
+                       executionAttemptId,
+                       new AllocationID(),
+                       subtaskIndex,
+                       attemptNumber,
+                       targetSlotNumber,
+                       null,
+                       producedPartitions,
+                       inputGates);
+       }
+
+       private static final class TestTaskManagerActions implements 
TaskManagerActions {
+               public static CompletableFuture<Throwable> fatalErrorFuture = 
new CompletableFuture<>();
+               private Object lock = new Object();
+               private Map<ExecutionAttemptID, CompletableFuture<Void>> 
runningFutures;
+               private Map<ExecutionAttemptID, CompletableFuture<Void>> 
canceledFutures;
+               private Map<ExecutionAttemptID, CompletableFuture<Void>> 
finishedFutures;
+               private Map<ExecutionAttemptID, CompletableFuture<Void>> 
failedFutures;
+               private JobMasterGateway jobMasterGateway;
+               private TaskSlotTable taskSlotTable;
+
+               public TestTaskManagerActions() {
+                       runningFutures = new HashMap<>();
+                       canceledFutures = new HashMap<>();
+                       finishedFutures = new HashMap<>();
+                       failedFutures = new HashMap<>();
+               }
+
+               public void setJobMasterGateway(JobMasterGateway 
jobMasterGateway) {
+                       this.jobMasterGateway = jobMasterGateway;
+               }
+
+               public void setTaskSlotTable(TaskSlotTable taskSlotTable) {
+                       this.taskSlotTable = taskSlotTable;
+               }
+
+               public void putRunningStateFuture(ExecutionAttemptID id, 
CompletableFuture<Void> completableFuture) {
+                               runningFutures.put(id, completableFuture);
+               }
+
+               public void putCanceledStateFuture(ExecutionAttemptID id, 
CompletableFuture<Void> completableFuture) {
+                               canceledFutures.put(id, completableFuture);
 
 Review comment:
   Good point. And one map should be enough.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to