This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 38b9c280128 [FLINK-32876][runtime] Prevent
ExecutionTimeBasedSlowTaskDetector from identifying tasks in CREATED state as
slow tasks.
38b9c280128 is described below
commit 38b9c280128981b3e809df1f963bdaf8c0491804
Author: JunRuiLee <[email protected]>
AuthorDate: Fri Aug 18 10:42:25 2023 +0800
[FLINK-32876][runtime] Prevent ExecutionTimeBasedSlowTaskDetector from
identifying tasks in CREATED state as slow tasks.
This closes #23222.
---
.../ExecutionTimeBasedSlowTaskDetector.java | 12 +++++++++++-
.../ExecutionTimeBasedSlowTaskDetectorTest.java | 21 +++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
index 34cb5b47b36..f6d08548a0a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
@@ -214,7 +214,17 @@ public class ExecutionTimeBasedSlowTaskDetector implements
SlowTaskDetector {
ExecutionTimeWithInputBytes baseline,
long currentTimeMillis) {
return executions.stream()
- .filter(e -> !e.getState().isTerminal() && e.getState() !=
ExecutionState.CANCELING)
+ .filter(
+ // We will filter out tasks that are in the CREATED
state, as we do not
+ // allow speculative execution for them because they
have not been
+ // scheduled.
+ // However, for tasks that are already in the
SCHEDULED state, we allow
+ // speculative execution to provide the capability of
parallel execution
+ // running.
+ e ->
+ !e.getState().isTerminal()
+ && e.getState() !=
ExecutionState.CANCELING
+ && e.getState() !=
ExecutionState.CREATED)
.filter(
e -> {
ExecutionTimeWithInputBytes timeWithBytes =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
index b11f86c80d4..1714d79edbf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
@@ -76,6 +76,27 @@ class ExecutionTimeBasedSlowTaskDetectorTest {
assertThat(slowTasks).hasSize(parallelism);
}
+ @Test
+ void testAllTasksInCreatedAndNoSlowTasks() throws Exception {
+ final int parallelism = 3;
+ final JobVertex jobVertex = createNoOpVertex(parallelism);
+ final JobGraph jobGraph =
JobGraphTestUtils.streamingJobGraph(jobVertex);
+
+ // all tasks are in the CREATED state, which is not classified as slow
tasks.
+ final ExecutionGraph executionGraph =
+ SchedulerTestingUtils.createScheduler(
+ jobGraph,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+ EXECUTOR_RESOURCE.getExecutor())
+ .getExecutionGraph();
+
+ final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
createSlowTaskDetector(0, 1, 0);
+ final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks
=
+ slowTaskDetector.findSlowTasks(executionGraph);
+
+ assertThat(slowTasks.size()).isZero();
+ }
+
@Test
void testFinishedTaskNotExceedRatio() throws Exception {
final int parallelism = 3;