Clarkkkkk commented on a change in pull request #7780: [FLINK-11593][tests]
Check & port TaskManagerTest to new code base
URL: https://github.com/apache/flink/pull/7780#discussion_r268019967
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -1771,7 +2415,349 @@ private TaskExecutor
createTaskExecutor(TaskManagerServices taskManagerServices)
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
- testingFatalErrorHandler);
+ testingFatalErrorHandler
+ );
+ }
+
+ private void runTestAfterTaskSubmission(
+ List<TaskDeploymentDescriptor> tdds,
+ Function<TaskExecutionState, CompletableFuture<Acknowledge>>
updateTaskExecutionStateFunction,
+ BiConsumerWithException<TaskExecutor, TaskManagerServices, ?>
testAfterSubmission)
+ throws Throwable {
+ runTestAfterTaskSubmission(tdds, this.configuration, true,
updateTaskExecutionStateFunction, null, testAfterSubmission);
+ }
+
+ private void runTestAfterTaskSubmission(
+ List<TaskDeploymentDescriptor> tdds,
+ Configuration configuration,
+ boolean localCommunication,
+ Function<TaskExecutionState, CompletableFuture<Acknowledge>>
updateTaskExecutionStateFunction,
+ Function<ResultPartitionID, CompletableFuture<Acknowledge>>
scheduleOrUpdateConsumersFunction,
+ BiConsumerWithException<TaskExecutor, TaskManagerServices, ?>
testAfterSubmission)
+ throws Throwable {
+ final JobMasterId jobMasterId = JobMasterId.generate();
+
+ final LibraryCacheManager libraryCacheManager =
mock(LibraryCacheManager.class);
+
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
+
+ TestTaskManagerActions taskManagerActions = new
TestTaskManagerActions();
+
+ final JobMasterGateway jobMasterGateway;
+ TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder =
+ new TestingJobMasterGatewayBuilder()
+ .setFencingTokenSupplier(() -> jobMasterId);
+ if (scheduleOrUpdateConsumersFunction != null) {
+ testingJobMasterGatewayBuilder
+
.setScheduleOrUpdateConsumersFunction(scheduleOrUpdateConsumersFunction);
+ } else {
+ testingJobMasterGatewayBuilder
+
.setScheduleOrUpdateConsumersFunction(resultPartitionID ->
CompletableFuture.completedFuture(Acknowledge.get()));
+ }
+ if (updateTaskExecutionStateFunction != null) {
+
testingJobMasterGatewayBuilder.setUpdateTaskExecutionStateFunction(updateTaskExecutionStateFunction);
+ jobMasterGateway =
testingJobMasterGatewayBuilder.build();
+
taskManagerActions.setJobMasterGateway(jobMasterGateway);
+ } else {
+ jobMasterGateway =
testingJobMasterGatewayBuilder.build();
+ }
+
+ final PartitionProducerStateChecker
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+
when(partitionProducerStateChecker.requestPartitionProducerState(any(), any(),
any()))
+
.thenReturn(CompletableFuture.completedFuture(ExecutionState.RUNNING));
+
+ final JobManagerConnection jobManagerConnection = new
JobManagerConnection(
+ jobId,
+ ResourceID.generate(),
+ jobMasterGateway,
+ taskManagerActions,
+ mock(CheckpointResponder.class),
+ new TestGlobalAggregateManager(),
+ libraryCacheManager,
+ new
RpcResultPartitionConsumableNotifier(jobMasterGateway, rpc.getExecutor(),
timeout),
+ partitionProducerStateChecker
+ );
+
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ jobManagerTable.put(jobId, jobManagerConnection);
+
+ Collection<ResourceProfile> resourceProfiles = new
ArrayList<>();
+ for (int i = 0; i < tdds.size(); i++) {
+ resourceProfiles.add(ResourceProfile.UNKNOWN);
+ }
+ if (resourceProfiles.size() == 0) {
+ resourceProfiles.add(ResourceProfile.UNKNOWN);
+ }
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(resourceProfiles, timerService);
+ taskManagerActions.setTaskSlotTable(taskSlotTable);
+
+ TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
+ false,
+ new File[]{tmp.newFolder()},
+ Executors.directExecutor());
+
+ final ConnectionManager connectionManager;
+ if (!localCommunication) {
+ NettyConfig nettyConfig =
TaskManagerServicesConfiguration
+ .fromConfiguration(configuration,
InetAddress.getByName(rpc.getAddress()), localCommunication).getNetworkConfig()
+ .nettyConfig();
+ connectionManager = new
NettyConnectionManager(nettyConfig);
+ } else {
+ connectionManager = new LocalConnectionManager();
+ }
+
+ final int numAllBuffers = 10;
+ final NetworkEnvironment networkEnvironment =
createTestNetworkEnvironment(
+ numAllBuffers,
+ 128,
+
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL),
+
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX),
+ 2,
+ 8,
+ true,
+ connectionManager);
+ networkEnvironment.start();
+
+ final TaskManagerServices taskManagerServices = new
TaskManagerServicesBuilder()
+ .setNetworkEnvironment(networkEnvironment)
+ .setTaskSlotTable(taskSlotTable)
+ .setJobManagerTable(jobManagerTable)
+ .setTaskStateManager(localStateStoresManager)
+ .build();
+
+ TestingTaskExecutor taskManager =
createTaskExecutor(taskManagerServices, configuration);
+
+ try {
+ taskManager.start();
+ taskManager.waitUntilStarted();
+ for (int i = 0; i < tdds.size(); i++) {
+ // change back to timeout
+ taskSlotTable.allocateSlot(i, jobId,
tdds.get(i).getAllocationId(), Time.seconds(60));
+ final TaskExecutorGateway tmGateway =
taskManager.getSelfGateway(TaskExecutorGateway.class);
+ CompletableFuture<Void> completionFuture = new
CompletableFuture<>();
+
taskManagerActions.putRunningStateFuture(tdds.get(i).getExecutionAttemptId(),
completionFuture);
+ tmGateway.submitTask(tdds.get(i), jobMasterId,
timeout).get();
+ completionFuture.get();
+ }
+ testAfterSubmission.accept(taskManager,
taskManagerServices);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskManager, timeout);
+ }
+ }
+
+ @Nonnull
+ private NetworkEnvironment createTestNetworkEnvironment(
+ int numBuffers,
+ int memorySegmentSize,
+ int partitionRequestInitialBackoff,
+ int partitionRequestMaxBackoff,
+ int networkBuffersPerChannel,
+ int extraNetworkBuffersPerGate,
+ boolean enableCreditBased,
+ ConnectionManager connectionManager
+ ) {
+ return new NetworkEnvironment(
+ new NetworkBufferPool(numBuffers, memorySegmentSize),
+ connectionManager,
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new KvStateRegistry(),
+ null,
+ null,
+ IOManager.IOMode.SYNC,
+ partitionRequestInitialBackoff,
+ partitionRequestMaxBackoff,
+ networkBuffersPerChannel,
+ extraNetworkBuffersPerGate,
+ enableCreditBased);
+ }
+
+ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+ String taskName,
+ JobVertexID vid,
+ ExecutionAttemptID eid,
+ Class<? extends AbstractInvokable> abstractInvokable
+ ) throws IOException {
+ return createTestTaskDeploymentDescriptor(taskName, vid, eid,
abstractInvokable, 1);
+ }
+
+ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+ String taskName,
+ JobVertexID vid,
+ ExecutionAttemptID eid,
+ Class<? extends AbstractInvokable> abstractInvokable,
+ int maxNumberOfSubtasks
+ ) throws IOException {
+ return createTestTaskDeploymentDescriptor(taskName, vid, eid,
abstractInvokable, maxNumberOfSubtasks, null, null);
+ }
+
+ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
+ String taskName,
+ JobVertexID vid,
+ ExecutionAttemptID eid,
+ Class<? extends AbstractInvokable> abstractInvokable,
+ int maxNumberOfSubtasks,
+ Collection<ResultPartitionDeploymentDescriptor>
producedPartitions,
+ Collection<InputGateDeploymentDescriptor> inputGates
+ ) throws IOException {
+ if (producedPartitions == null) {
+ producedPartitions = Collections.emptyList();
+ }
+ if (inputGates == null) {
+ inputGates = Collections.emptyList();
+ }
+ return createTaskDeploymentDescriptor(
+ jobId, testName.getMethodName(), vid, eid,
+ new SerializedValue<>(new ExecutionConfig()), taskName,
maxNumberOfSubtasks, 0, 1, 0,
+ new Configuration(), new Configuration(),
abstractInvokable.getName(),
+ producedPartitions,
+ inputGates,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ 0);
+ }
+
+ private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+ JobID jobId,
+ String jobName,
+ JobVertexID jobVertexId,
+ ExecutionAttemptID executionAttemptId,
+ SerializedValue<ExecutionConfig> serializedExecutionConfig,
+ String taskName,
+ int maxNumberOfSubtasks,
+ int subtaskIndex,
+ int numberOfSubtasks,
+ int attemptNumber,
+ Configuration jobConfiguration,
+ Configuration taskConfiguration,
+ String invokableClassName,
+ Collection<ResultPartitionDeploymentDescriptor>
producedPartitions,
+ Collection<InputGateDeploymentDescriptor> inputGates,
+ Collection<PermanentBlobKey> requiredJarFiles,
+ Collection<URL> requiredClasspaths,
+ int targetSlotNumber) throws IOException {
+
+ JobInformation jobInformation = new JobInformation(
+ jobId,
+ jobName,
+ serializedExecutionConfig,
+ jobConfiguration,
+ requiredJarFiles,
+ requiredClasspaths);
+
+ TaskInformation taskInformation = new TaskInformation(
+ jobVertexId,
+ taskName,
+ numberOfSubtasks,
+ maxNumberOfSubtasks,
+ invokableClassName,
+ taskConfiguration);
+
+ SerializedValue<JobInformation> serializedJobInformation = new
SerializedValue<>(jobInformation);
+ SerializedValue<TaskInformation> serializedJobVertexInformation
= new SerializedValue<>(taskInformation);
+
+ return new TaskDeploymentDescriptor(
+ jobId,
+ new
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+ new
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
+ executionAttemptId,
+ new AllocationID(),
+ subtaskIndex,
+ attemptNumber,
+ targetSlotNumber,
+ null,
+ producedPartitions,
+ inputGates);
+ }
+
+ private static final class TestTaskManagerActions implements
TaskManagerActions {
+ public static CompletableFuture<Throwable> fatalErrorFuture =
new CompletableFuture<>();
+ private Object lock = new Object();
+ private Map<ExecutionAttemptID, CompletableFuture<Void>>
runningFutures;
+ private Map<ExecutionAttemptID, CompletableFuture<Void>>
canceledFutures;
+ private Map<ExecutionAttemptID, CompletableFuture<Void>>
finishedFutures;
+ private Map<ExecutionAttemptID, CompletableFuture<Void>>
failedFutures;
+ private JobMasterGateway jobMasterGateway;
+ private TaskSlotTable taskSlotTable;
+
+ public TestTaskManagerActions() {
+ runningFutures = new HashMap<>();
+ canceledFutures = new HashMap<>();
+ finishedFutures = new HashMap<>();
+ failedFutures = new HashMap<>();
+ }
+
+ public void setJobMasterGateway(JobMasterGateway
jobMasterGateway) {
+ this.jobMasterGateway = jobMasterGateway;
+ }
+
+ public void setTaskSlotTable(TaskSlotTable taskSlotTable) {
+ this.taskSlotTable = taskSlotTable;
+ }
+
+ public void putRunningStateFuture(ExecutionAttemptID id,
CompletableFuture<Void> completableFuture) {
+ runningFutures.put(id, completableFuture);
+ }
+
+ public void putCanceledStateFuture(ExecutionAttemptID id,
CompletableFuture<Void> completableFuture) {
+ canceledFutures.put(id, completableFuture);
Review comment:
Good point. And one map should be enough.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services