This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e1a74df4427 [FLINK-28771][runtime] Assign speculative execution
attempt with correct CREATED timestamp
e1a74df4427 is described below
commit e1a74df4427e99f4b0f3aaa4e8f4f5ff7cbd044e
Author: Zhu Zhu <[email protected]>
AuthorDate: Tue Aug 2 11:57:42 2022 +0800
[FLINK-28771][runtime] Assign speculative execution attempt with correct
CREATED timestamp
This closes #20411.
---
.../scheduler/adaptivebatch/SpeculativeScheduler.java | 15 ++++++++++-----
.../scheduler/adaptivebatch/SpeculativeSchedulerTest.java | 6 ++++++
2 files changed, 16 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index a54f34a85cd..7c0a6c806af 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -313,10 +313,11 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
@Override
public void notifySlowTasks(Map<ExecutionVertexID,
Collection<ExecutionAttemptID>> slowTasks) {
+ final long currentTimestamp = System.currentTimeMillis();
numSlowExecutionVertices = slowTasks.size();
// add slow nodes to blocklist before scheduling new speculative
executions
- blockSlowNodes(slowTasks);
+ blockSlowNodes(slowTasks, currentTimestamp);
final List<Execution> newSpeculativeExecutions = new ArrayList<>();
final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
@@ -340,7 +341,10 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
final Collection<Execution> attempts =
IntStream.range(0, newSpeculativeExecutionsToDeploy)
-
.mapToObj(executionVertex::createNewSpeculativeExecution)
+ .mapToObj(
+ i ->
+
executionVertex.createNewSpeculativeExecution(
+ currentTimestamp))
.collect(Collectors.toList());
setupSubtaskGatewayForAttempts(executionVertex, attempts);
@@ -354,10 +358,11 @@ public class SpeculativeScheduler extends
AdaptiveBatchScheduler
executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
}
- private void blockSlowNodes(Map<ExecutionVertexID,
Collection<ExecutionAttemptID>> slowTasks) {
+ private void blockSlowNodes(
+ Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks,
+ long currentTimestamp) {
if (!blockSlowNodeDuration.isZero()) {
- final long blockedEndTimestamp =
- System.currentTimeMillis() +
blockSlowNodeDuration.toMillis();
+ final long blockedEndTimestamp = currentTimestamp +
blockSlowNodeDuration.toMillis();
final Collection<BlockedNode> nodesToBlock =
getSlowNodeIds(slowTasks).stream()
.map(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index 9f9619e1d90..b438240a599 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -132,11 +132,17 @@ class SpeculativeSchedulerTest {
assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(1);
+ final long timestamp = System.currentTimeMillis();
notifySlowTask(scheduler, attempt1);
assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
assertThat(testBlocklistOperations.getAllBlockedNodeIds())
.containsExactly(attempt1.getAssignedResourceLocation().getNodeId());
+
+ final Execution attempt2 = getExecution(ev, 1);
+ assertThat(attempt2.getState()).isEqualTo(ExecutionState.DEPLOYING);
+ assertThat(attempt2.getStateTimestamp(ExecutionState.CREATED))
+ .isGreaterThanOrEqualTo(timestamp);
}
@Test