zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r926285695


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -28,14 +28,18 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, 
ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, 
ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
     private final int numberRegisteredSlots;
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagaers;

Review Comment:
   numberBlockedTaskManagaers -> numberBlockedTaskManagers



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -65,6 +73,14 @@ public int getNumberFreeSlots() {
         return numberFreeSlots;
     }
 
+    public int getNumberBlockedTaskManagaers() {

Review Comment:
   getNumberBlockedTaskManagaers -> getNumberBlockedTaskManagers



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,13 +43,25 @@ public class ArchivedExecutionVertex implements 
AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never 
be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        assert vertex.getCurrentExecutions().contains(vertexCurrentExecution);
+        currentExecutions = new 
ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutions.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {
+            if (execution != vertexCurrentExecution) {
+                currentExecutions.add(execution.archive());
+            }
+        }
     }
 
     public ArchivedExecutionVertex(

Review Comment:
   Let's annotate it as `VisibleForTesting` because now it is only used for 
testing purposes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java:
##########
@@ -46,6 +47,8 @@ public interface AccessExecutionVertex {
      */
     AccessExecution getCurrentExecutionAttempt();
 
+    <T extends AccessExecution> Collection<T> getCurrentExecutions();

Review Comment:
   A java doc should be added for it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java:
##########
@@ -64,6 +66,7 @@ public TaskManagerDetailsInfo(
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo 
freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription 
hardwareDescription,
             @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration 
memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked,

Review Comment:
   Maybe it can just be a `boolean`? See the above comment.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java:
##########
@@ -103,4 +105,33 @@ public void testJobDetailsCompatibleUnmarshalling() throws 
IOException {
 
         assertEquals(expected, unmarshalled);
     }
+
+    @Test
+    public void testJobDetailsWithExecutionAttemptsMarshalling() throws 
JsonProcessingException {
+        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new 
HashMap<>();
+        currentExecutionAttempts.computeIfAbsent("a", k -> new 
HashMap<>()).put(1, 2);
+        currentExecutionAttempts.computeIfAbsent("a", k -> new 
HashMap<>()).put(2, 4);
+        currentExecutionAttempts.computeIfAbsent("b", k -> new 
HashMap<>()).put(3, 1);
+
+        final JobDetails expected =
+                new JobDetails(
+                        new JobID(),
+                        "foobar",
+                        1L,
+                        10L,
+                        9L,
+                        JobStatus.RUNNING,
+                        8L,
+                        new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
+                        42,
+                        currentExecutionAttempts);
+
+        final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+        final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+        final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, 
JobDetails.class);
+
+        assertEquals(expected, unmarshalled);

Review Comment:
   Flink is migrating to JUnit 5 so that all the new tests should use `JUnit5` 
and `assertJ`.
   You can migrate the existing tests to JUnit5 in a  hotfix commit before 
making functional changes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -28,14 +28,18 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, 
ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, 
ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
     private final int numberRegisteredSlots;
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagaers;
+
+    private final int numberBlockedSlots;

Review Comment:
   I think `numberBlockedFreeSlots` would be more accurate and easier to 
understand.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java:
##########
@@ -31,6 +31,6 @@ protected Class<ClusterOverviewWithVersion> 
getTestResponseClass() {
 
     @Override
     protected ClusterOverviewWithVersion getTestResponseInstance() {
-        return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", 
"commit");
+        return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, 
"version", "commit");

Review Comment:
   It's better to keep the other params as is and just add numbers of the newly 
introduced params.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -142,22 +142,37 @@ public static class TaskQueryScopeInfo extends 
QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;
 
         public TaskQueryScopeInfo(String jobID, String vertexid, int 
subtaskIndex) {

Review Comment:
   Looks to me there are not many occurrences of the invocations of the 
constructors. So I prefer to change the original constructors and fix the 
tests, instead of adding new ones. 
   
   Besides that, the default attemptNumber should be 0 instead of -1.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -671,7 +674,8 @@ public CompletableFuture<TaskManagerInfoWithSlots> 
requestTaskManagerDetailsInfo
                                     
slotManager.getRegisteredResourceOf(instanceId),
                                     slotManager.getFreeResourceOf(instanceId),
                                     taskExecutor.getHardwareDescription(),
-                                    taskExecutor.getMemoryConfiguration()),
+                                    taskExecutor.getMemoryConfiguration(),
+                                    blocked),

Review Comment:
   nit: can invoke the `isBlockedTaskManager` here, just like others. This 
helps the code to look better.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -630,6 +630,7 @@ public CompletableFuture<Collection<TaskManagerInfo>> 
requestTaskManagerInfo(Tim
             final ResourceID resourceId = taskExecutorEntry.getKey();
             final WorkerRegistration<WorkerType> taskExecutor = 
taskExecutorEntry.getValue();
 
+            boolean blocked = 
blocklistHandler.isBlockedTaskManager(taskExecutor.getResourceID());

Review Comment:
   It's better to make local variables `final`, if possible.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -79,6 +95,11 @@ public ArchivedExecution getCurrentExecutionAttempt() {
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return currentExecutions;

Review Comment:
   ```suggestion
           return Collections.unmodifiableCollection(currentExecutions);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -142,22 +142,37 @@ public static class TaskQueryScopeInfo extends 
QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;

Review Comment:
   I would suggest to name it as `attemptNumber`, which is commonly used in a 
lot of places already.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -125,6 +136,7 @@ public TaskManagerInfo(
         this.freeResource = freeResource;
         this.hardwareDescription = 
Preconditions.checkNotNull(hardwareDescription);
         this.memoryConfiguration = 
Preconditions.checkNotNull(memoryConfiguration);
+        this.blocked = blocked != null && blocked;

Review Comment:
   I prefer to add a bracket here to clearly show the order of the result 
computation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java:
##########
@@ -495,6 +496,10 @@ protected List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initiali
         final SubtaskMetricsHandler subtaskMetricsHandler =
                 new SubtaskMetricsHandler(leaderRetriever, timeout, 
responseHeaders, metricFetcher);
 
+        final SubtaskAttemptMetricsHandler subtaskAttemptMetricsHandler =

Review Comment:
   I think this REST API is not in the scope of this FLIP. Therefore, I think 
we should remove it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,13 +43,25 @@ public class ArchivedExecutionVertex implements 
AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never 
be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        assert vertex.getCurrentExecutions().contains(vertexCurrentExecution);

Review Comment:
   Better to use `Preconditions.checkState`.
   
   And it's better to check it in a more performant way, like:
   ```
   checkState(vertexCurrentExecution == 
vertex.getCurrentExecution(vertexCurrentExecution.getAttemptNumber));
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -113,7 +123,8 @@ public TaskManagerInfo(
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo 
totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo 
freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription 
hardwareDescription,
-            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration 
memoryConfiguration) {
+            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration 
memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) {

Review Comment:
   Why declaring `blocked` to be a `@Nullable Boolean` if we finally will turn 
it into a `boolean` and exclude it if it is false?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -174,23 +189,44 @@ public static class OperatorQueryScopeInfo extends 
QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;
         public final String operatorName;
 
         public OperatorQueryScopeInfo(

Review Comment:
   Looks to me there are not many occurrences of the invocations of the 
constructors. So I prefer to change the original constructors and fix the 
tests, instead of adding new ones. 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(0)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(1)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        
assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) 
jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        final CompletableFuture<?> terminationFuture = 
ev.getTerminationFuture();
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        assertThat(terminationFuture.isDone()).isFalse();
+        assertThat(eg.getState()).isSameAs(JobStatus.RUNNING);
+
+        e2.cancel();
+        assertThat(terminationFuture.isDone()).isTrue();
+        assertThat(eg.getState()).isSameAs(JobStatus.FINISHED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+        }
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() 
throws Exception {
+        final JobVertex jobVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        return (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+    }
+
+    private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) 
throws Exception {
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setExecutionJobVertexFactory(new 
SpeculativeExecutionJobVertex.Factory())
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        
executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+        
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return executionGraph;
+    }
+
+    private static void compareExecutionVertex(
+            AccessExecutionVertex runtimeVertex, AccessExecutionVertex 
archivedVertex) {
+        assertEquals(

Review Comment:
   JUnit assertions should be avoided. See 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to