zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r931837487


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -227,7 +246,8 @@ public boolean equals(Object o) {
                 && Objects.equals(resourceId, that.resourceId)
                 && Objects.equals(address, that.address)
                 && Objects.equals(hardwareDescription, 
that.hardwareDescription)
-                && Objects.equals(memoryConfiguration, 
that.memoryConfiguration);
+                && Objects.equals(memoryConfiguration, 
that.memoryConfiguration)
+                && Objects.equals(blocked, that.blocked);

Review Comment:
   -> `&& blocked == that.blocked;`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -688,15 +688,31 @@ public CompletableFuture<TaskManagerInfoWithSlots> 
requestTaskManagerDetailsInfo
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time 
timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = 
slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();
+        ResourceProfile freeResource = slotManager.getFreeResource();
+
+        int blockedTaskManagers = 0;
+        int totalBlockedSlots = 0;

Review Comment:
   totalBlockedSlots -> totalBlockedFreeSlots



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java:
##########
@@ -67,7 +67,7 @@ public class TaskManagerDetailsHandlerTest extends TestLogger 
{
 
     private TaskManagerDetailsHandler testInstance;
 
-    @Before
+    @BeforeEach

Review Comment:
   The change is better to be part of the commit to migrate tests to JUnit5.
   The usages of hamcrest should be refactored as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -688,15 +688,31 @@ public CompletableFuture<TaskManagerInfoWithSlots> 
requestTaskManagerDetailsInfo
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time 
timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = 
slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();
+        ResourceProfile freeResource = slotManager.getFreeResource();
+
+        int blockedTaskManagers = 0;
+        int totalBlockedSlots = 0;
+        for (WorkerRegistration<WorkerType> registration : 
taskExecutors.values()) {
+            if 
(blocklistHandler.isBlockedTaskManager(registration.getResourceID())) {
+                blockedTaskManagers++;
+                int blockedSlots = 
slotManager.getNumberFreeSlotsOf(registration.getInstanceID());

Review Comment:
   blockedSlots -> blockedFreeSlots



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */

Review Comment:
   maybe: Tests for the {@link ArchivedExecutionVertex} created from a {@link 
SpeculativeExecutionVertex}.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -156,6 +179,27 @@ public synchronized ComponentMetricStore 
getSubtaskMetricStore(
         return 
ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
     }
 
+    public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+            String jobID, String taskID, int subtaskIndex, int attemptId) {
+        JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+        if (job == null) {
+            return null;
+        }
+        TaskMetricStore task = job.getTaskMetricStore(taskID);
+        if (task == null) {
+            return null;
+        }
+        SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
+        if (attemptId < 0) {

Review Comment:
   A negative attempt number should never happen. Therefore a 
`checkArgument(attemptNumber >= 0)` is enough.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> 
new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new 
ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new 
SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt 
id is not set, which

Review Comment:
   This doc seems to be outdated



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java:
##########
@@ -670,6 +677,95 @@ void 
testNewlyAddedBlockedNodesWillBeSynchronizedToAllRegisteredJobMasters() thr
         
assertThat(receivedBlockedNodes2).containsExactlyInAnyOrder(blockedNode1, 
blockedNode2);
     }
 
+    @Test
+    void testResourceOverviewWithBlockedSlots() throws Exception {
+        ManuallyTriggeredScheduledExecutor executor = new 
ManuallyTriggeredScheduledExecutor();
+        final SlotManager slotManager = 
DeclarativeSlotManagerBuilder.newBuilder(executor).build();
+        resourceManager =
+                new ResourceManagerBuilder()
+                        .withSlotManager(slotManager)
+                        .withBlocklistHandlerFactory(
+                                new 
DefaultBlocklistHandler.Factory(Duration.ofMillis(100L)))
+                        .buildAndStart();
+
+        final ResourceManagerGateway resourceManagerGateway =
+                resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+        ResourceID taskExecutor = ResourceID.generate();
+        ResourceID taskExecutorToBlock = ResourceID.generate();
+        registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutor, 3);
+        registerTaskExecutorAndSlot(resourceManagerGateway, 
taskExecutorToBlock, 5);
+        executor.triggerAll();
+
+        ResourceOverview overview =
+                
resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+        assertThat(overview.getNumberTaskManagers()).isEqualTo(2);
+        assertThat(overview.getNumberRegisteredSlots()).isEqualTo(8);
+        assertThat(overview.getNumberFreeSlots()).isEqualTo(8);
+        assertThat(overview.getNumberBlockedTaskManagers()).isEqualTo(0);
+        assertThat(overview.getNumberBlockedFreeSlots()).isEqualTo(0);
+        assertThat(overview.getTotalResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+        assertThat(overview.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+
+        resourceManagerGateway.notifyNewBlockedNodes(
+                Collections.singleton(
+                        new BlockedNode(
+                                
resourceManager.getNodeIdOfTaskManager(taskExecutorToBlock),
+                                "Test cause",
+                                System.currentTimeMillis())));

Review Comment:
   Better to be Long.Max_VALUE in case it gets unblocked in another thread and 
make the test unstable.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> 
new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new 
ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new 
SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {
+                        // Consider as the current attempt if current attempt 
id is not set, which

Review Comment:
   id -> number



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -177,7 +221,9 @@ public void add(MetricDump metric) {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isCurrentAttempt = true;

Review Comment:
   Seems this ` = true` is of no use.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(0)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(1)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        
assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) 
jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        final CompletableFuture<?> terminationFuture = 
ev.getTerminationFuture();
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        assertThat(terminationFuture.isDone()).isFalse();
+        assertThat(eg.getState()).isSameAs(JobStatus.RUNNING);
+
+        e2.cancel();
+        assertThat(terminationFuture.isDone()).isTrue();
+        assertThat(eg.getState()).isSameAs(JobStatus.FINISHED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+        }
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() 
throws Exception {
+        final JobVertex jobVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+        assert jv != null;

Review Comment:
   This `assert` should be avoided. `AssertJ` assertions should be used instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -156,6 +179,27 @@ public synchronized ComponentMetricStore 
getSubtaskMetricStore(
         return 
ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
     }
 
+    public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+            String jobID, String taskID, int subtaskIndex, int attemptId) {

Review Comment:
   attemptId -> attemptNumber



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -214,15 +260,33 @@ public void add(MetricDump metric) {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> 
new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new 
ComponentMetricStore());
+                                    taskInfo.subtaskIndex, k -> new 
SubtaskMetricStore());
+
+                    if (taskInfo.attemptNumber >= 0) {

Review Comment:
   Seems this check is not needed?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);

Review Comment:
   I think the main verification of this test is the 
`ArchivedExecutionGraphTestUtils .compareExecutionVertex(...)`.
   The other assertions are already done in `SpeculativeExecutionVertexTest` 
and can be excluded from this test class.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(0)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(1)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        
assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) 
jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = 
ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        final CompletableFuture<?> terminationFuture = 
ev.getTerminationFuture();
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        assertThat(terminationFuture.isDone()).isFalse();
+        assertThat(eg.getState()).isSameAs(JobStatus.RUNNING);
+
+        e2.cancel();
+        assertThat(terminationFuture.isDone()).isTrue();
+        assertThat(eg.getState()).isSameAs(JobStatus.FINISHED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = 
createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+        }
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);

Review Comment:
   This check should be in the loop.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -54,6 +57,14 @@ public class MetricStore {
     private final Map<String, TaskManagerMetricStore> taskManagers = new 
ConcurrentHashMap<>();
     private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
 
+    /**
+     * The map holds the attempt number of the representing execution for each 
subtask of each
+     * vertex. When a metric of an execution attempt is added, the metric can 
also be added to the
+     * SubtaskMetricStore when it is of the representing execution.
+     */
+    private final Map<String, Map<String, Map<Integer, Integer>>> 
currentExecutionAttempts =

Review Comment:
   Maybe name it as `representativeExecutionAttempts ` to make it more 
explanatory?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##########
@@ -177,7 +221,9 @@ public void add(MetricDump metric) {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isCurrentAttempt = true;

Review Comment:
   Maybe name it as `isRepresentativeAttempt` is more explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to