dmvk commented on a change in pull request #16535:
URL: https://github.com/apache/flink/pull/16535#discussion_r676615325
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterGatewayTester.java
##########
@@ -0,0 +1,262 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class JobMasterGatewayTester implements Closeable {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ private static TaskStateSnapshot
createNonEmptyStateSnapshot(TaskInformation taskInformation) {
+ final TaskStateSnapshot checkpointStateHandles = new
TaskStateSnapshot();
+ checkpointStateHandles.putSubtaskStateByOperatorID(
+ OperatorID.fromJobVertexID(taskInformation.getJobVertexId()),
+ OperatorSubtaskState.builder()
+ .setManagedOperatorState(
+ new OperatorStreamStateHandle(
+ Collections.emptyMap(),
+ new ByteStreamStateHandle("foobar",
new byte[0])))
+ .build());
+ return checkpointStateHandles;
+ }
+
+ private static class CheckpointCompletionHandler {
+
+ private final Map<ExecutionAttemptID, CompletableFuture<Void>>
completedAttemptFutures;
+ private final CompletableFuture<Void> completedFuture;
+
+ public CheckpointCompletionHandler(List<TaskDeploymentDescriptor>
descriptors) {
+ this.completedAttemptFutures =
+ descriptors.stream()
+
.map(TaskDeploymentDescriptor::getExecutionAttemptId)
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ ignored -> new
CompletableFuture<>()));
+ this.completedFuture =
FutureUtils.completeAll(completedAttemptFutures.values());
+ }
+
+ void completeAttempt(ExecutionAttemptID executionAttemptId) {
+ completedAttemptFutures.get(executionAttemptId).complete(null);
+ }
+
+ CompletableFuture<Void> getCompletedFuture() {
+ return completedFuture;
+ }
+ }
+
+ private final UnresolvedTaskManagerLocation taskManagerLocation =
+ new LocalUnresolvedTaskManagerLocation();
+ private final ConcurrentMap<ExecutionAttemptID, TaskDeploymentDescriptor>
descriptors =
+ new ConcurrentHashMap<>();
+
+ private final TestingRpcService rpcService;
+ private final JobID jobId;
+ private final JobMasterGateway jobMasterGateway;
+ private final TaskExecutorGateway taskExecutorGateway;
+
+ private final CompletableFuture<ExecutionGraphInfo>
executionGraphInfoFuture;
+
+ private final CompletableFuture<List<TaskDeploymentDescriptor>>
descriptorsFuture =
+ new CompletableFuture<>();
+
+ private final ConcurrentMap<Long, CheckpointCompletionHandler> checkpoints
=
+ new ConcurrentHashMap<>();
+
+ public JobMasterGatewayTester(
+ TestingRpcService rpcService, JobID jobId, JobMasterGateway
jobMasterGateway) {
+ this.rpcService = rpcService;
+ this.jobId = jobId;
+ this.jobMasterGateway = jobMasterGateway;
+ this.taskExecutorGateway = createTaskExecutorGateway();
+ executionGraphInfoFuture = jobMasterGateway.requestJob(TIMEOUT);
+ }
+
+ public CompletableFuture<Acknowledge> transitionTo(
+ List<TaskDeploymentDescriptor> descriptors, ExecutionState state) {
+ final List<CompletableFuture<Acknowledge>> futures =
+ descriptors.stream()
+ .map(TaskDeploymentDescriptor::getExecutionAttemptId)
+ .map(
+ attemptId ->
+
jobMasterGateway.updateTaskExecutionState(
+ new
TaskExecutionState(attemptId, state)))
+ .collect(Collectors.toList());
+ return FutureUtils.completeAll(futures).thenApply(ignored ->
Acknowledge.get());
+ }
+
+ public CompletableFuture<List<TaskDeploymentDescriptor>>
deployVertices(int numSlots) {
+ return jobMasterGateway
+ .registerTaskManager(
+ taskExecutorGateway.getAddress(), taskManagerLocation,
jobId, TIMEOUT)
+ .thenCompose(ignored -> offerSlots(numSlots))
+ .thenCompose(ignored -> descriptorsFuture);
+ }
+
+ public CompletableFuture<Void> awaitCheckpoint(long checkpointId) {
+ return descriptorsFuture.thenCompose(
+ descriptors ->
+ checkpoints
+ .computeIfAbsent(
+ checkpointId,
+ key -> new
CheckpointCompletionHandler(descriptors))
+ .getCompletedFuture());
+ }
+
+ @Override
+ public void close() throws IOException {
+ rpcService.unregisterGateway(taskExecutorGateway.getAddress());
+ }
+
+ private TaskExecutorGateway createTaskExecutorGateway() {
+ final AtomicReference<TaskExecutorGateway>
taskExecutorGatewayReference =
+ new AtomicReference<>();
+ final TestingTaskExecutorGateway taskExecutorGateway =
+ new TestingTaskExecutorGatewayBuilder()
+ .setSubmitTaskConsumer(this::onSubmitTaskConsumer)
+
.setTriggerCheckpointFunction(this::onTriggerCheckpoint)
+
.setConfirmCheckpointFunction(this::onConfirmCheckpoint)
+ .createTestingTaskExecutorGateway();
+ taskExecutorGatewayReference.set(taskExecutorGateway);
Review comment:
👍 We don't, it's not used anymore
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]