This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e1a74df4427 [FLINK-28771][runtime] Assign speculative execution 
attempt with correct CREATED timestamp
e1a74df4427 is described below

commit e1a74df4427e99f4b0f3aaa4e8f4f5ff7cbd044e
Author: Zhu Zhu <[email protected]>
AuthorDate: Tue Aug 2 11:57:42 2022 +0800

    [FLINK-28771][runtime] Assign speculative execution attempt with correct 
CREATED timestamp
    
    This closes #20411.
---
 .../scheduler/adaptivebatch/SpeculativeScheduler.java     | 15 ++++++++++-----
 .../scheduler/adaptivebatch/SpeculativeSchedulerTest.java |  6 ++++++
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index a54f34a85cd..7c0a6c806af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -313,10 +313,11 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
 
     @Override
     public void notifySlowTasks(Map<ExecutionVertexID, 
Collection<ExecutionAttemptID>> slowTasks) {
+        final long currentTimestamp = System.currentTimeMillis();
         numSlowExecutionVertices = slowTasks.size();
 
         // add slow nodes to blocklist before scheduling new speculative 
executions
-        blockSlowNodes(slowTasks);
+        blockSlowNodes(slowTasks, currentTimestamp);
 
         final List<Execution> newSpeculativeExecutions = new ArrayList<>();
         final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
@@ -340,7 +341,10 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
 
                 final Collection<Execution> attempts =
                         IntStream.range(0, newSpeculativeExecutionsToDeploy)
-                                
.mapToObj(executionVertex::createNewSpeculativeExecution)
+                                .mapToObj(
+                                        i ->
+                                                
executionVertex.createNewSpeculativeExecution(
+                                                        currentTimestamp))
                                 .collect(Collectors.toList());
 
                 setupSubtaskGatewayForAttempts(executionVertex, attempts);
@@ -354,10 +358,11 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
                 
executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
     }
 
-    private void blockSlowNodes(Map<ExecutionVertexID, 
Collection<ExecutionAttemptID>> slowTasks) {
+    private void blockSlowNodes(
+            Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks,
+            long currentTimestamp) {
         if (!blockSlowNodeDuration.isZero()) {
-            final long blockedEndTimestamp =
-                    System.currentTimeMillis() + 
blockSlowNodeDuration.toMillis();
+            final long blockedEndTimestamp = currentTimestamp + 
blockSlowNodeDuration.toMillis();
             final Collection<BlockedNode> nodesToBlock =
                     getSlowNodeIds(slowTasks).stream()
                             .map(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index 9f9619e1d90..b438240a599 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -132,11 +132,17 @@ class SpeculativeSchedulerTest {
 
         assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(1);
 
+        final long timestamp = System.currentTimeMillis();
         notifySlowTask(scheduler, attempt1);
 
         assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
         assertThat(testBlocklistOperations.getAllBlockedNodeIds())
                 
.containsExactly(attempt1.getAssignedResourceLocation().getNodeId());
+
+        final Execution attempt2 = getExecution(ev, 1);
+        assertThat(attempt2.getState()).isEqualTo(ExecutionState.DEPLOYING);
+        assertThat(attempt2.getStateTimestamp(ExecutionState.CREATED))
+                .isGreaterThanOrEqualTo(timestamp);
     }
 
     @Test

Reply via email to