This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef07590403ad7448a523ef5da78da0306cccdae7
Author: hongli <[email protected]>
AuthorDate: Sat Jun 25 17:05:26 2022 +0800

    [FLINK-28136][runtime] Implement ExecutionTimeBasedSlowTaskDetector
    
    This closes #20072.
---
 .../generated/expert_scheduling_section.html       |  24 ++
 .../slow_task_detector_configuration.html          |  36 +++
 .../configuration/SlowTaskDetectorOptions.java     |  71 ++++++
 .../runtime/executiongraph/ExecutionVertex.java    |   5 +
 .../ExecutionTimeBasedSlowTaskDetector.java        | 234 +++++++++++++++++++
 .../ExecutionTimeBasedSlowTaskDetectorTest.java    | 250 +++++++++++++++++++++
 6 files changed, 620 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 2121a982881..b78d9bfd2db 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -104,5 +104,29 @@
             <td>Integer</td>
             <td>Defines the maximum number of slots that the Flink cluster 
allocates. This configuration option is meant for limiting the resource 
consumption for batch workloads. It is not recommended to configure this option 
for streaming workloads, which may fail if there are not enough slots. Note 
that this configuration option does not take effect for standalone clusters, 
where how many slots are allocated is not controlled by Flink.</td>
         </tr>
+        <tr>
+            <td><h5>slow-task-detector.check-interval</h5></td>
+            <td style="word-wrap: break-word;">1 s</td>
+            <td>Duration</td>
+            <td>The interval to check slow tasks.</td>
+        </tr>
+        <tr>
+            
<td><h5>slow-task-detector.execution-time.baseline-lower-bound</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>The lower bound of slow task detection baseline.</td>
+        </tr>
+        <tr>
+            
<td><h5>slow-task-detector.execution-time.baseline-multiplier</h5></td>
+            <td style="word-wrap: break-word;">1.5</td>
+            <td>Double</td>
+            <td>The multiplier to calculate the slow tasks detection baseline. 
Given that the parallelism is N and the ratio is R, define T as the median of 
the first N*R finished tasks' execution time. The baseline will be T*M, where M 
is the multiplier of the baseline.</td>
+        </tr>
+        <tr>
+            <td><h5>slow-task-detector.execution-time.baseline-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Double</td>
+            <td>The finished execution ratio threshold to calculate the slow 
tasks detection baseline. Given that the parallelism is N and the ratio is R, 
define T as the median of the first N*R finished tasks' execution time. The 
baseline will be T*M, where M is the multiplier of the baseline.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/docs/layouts/shortcodes/generated/slow_task_detector_configuration.html 
b/docs/layouts/shortcodes/generated/slow_task_detector_configuration.html
new file mode 100644
index 00000000000..631bbc7122b
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/slow_task_detector_configuration.html
@@ -0,0 +1,36 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>slow-task-detector.check-interval</h5></td>
+            <td style="word-wrap: break-word;">1 s</td>
+            <td>Duration</td>
+            <td>The interval to check slow tasks.</td>
+        </tr>
+        <tr>
+            
<td><h5>slow-task-detector.execution-time.baseline-lower-bound</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>The lower bound of slow task detection baseline.</td>
+        </tr>
+        <tr>
+            
<td><h5>slow-task-detector.execution-time.baseline-multiplier</h5></td>
+            <td style="word-wrap: break-word;">1.5</td>
+            <td>Double</td>
+            <td>The multiplier to calculate the slow tasks detection baseline. 
Given that the parallelism is N and the ratio is R, define T as the median of 
the first N*R finished tasks' execution time. The baseline will be T*M, where M 
is the multiplier of the baseline.</td>
+        </tr>
+        <tr>
+            <td><h5>slow-task-detector.execution-time.baseline-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Double</td>
+            <td>The finished execution ratio threshold to calculate the slow 
tasks detection baseline. Given that the parallelism is N and the ratio is R, 
define T as the median of the first N*R finished tasks' execution time. The 
baseline will be T*M, where M is the multiplier of the baseline.</td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SlowTaskDetectorOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/SlowTaskDetectorOptions.java
new file mode 100644
index 00000000000..12164f7c06e
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SlowTaskDetectorOptions.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.docs.Documentation;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Configuration options to detect slow tasks. */
+public class SlowTaskDetectorOptions {
+
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Duration> CHECK_INTERVAL =
+            key("slow-task-detector.check-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription("The interval to check slow tasks.");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Duration> 
EXECUTION_TIME_BASELINE_LOWER_BOUND =
+            key("slow-task-detector.execution-time.baseline-lower-bound")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDescription("The lower bound of slow task detection 
baseline.");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Double> EXECUTION_TIME_BASELINE_RATIO =
+            key("slow-task-detector.execution-time.baseline-ratio")
+                    .doubleType()
+                    .defaultValue(0.75)
+                    .withDescription(
+                            "The finished execution ratio threshold to 
calculate the slow tasks "
+                                    + "detection baseline. Given that the 
parallelism is N and the "
+                                    + "ratio is R, define T as the median of 
the first N*R finished "
+                                    + "tasks' execution time. The baseline 
will be T*M, where M is "
+                                    + "the multiplier of the baseline.");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Double> 
EXECUTION_TIME_BASELINE_MULTIPLIER =
+            key("slow-task-detector.execution-time.baseline-multiplier")
+                    .doubleType()
+                    .defaultValue(1.5)
+                    .withDescription(
+                            "The multiplier to calculate the slow tasks 
detection baseline. Given "
+                                    + "that the parallelism is N and the ratio 
is R, define T as "
+                                    + "the median of the first N*R finished 
tasks' execution time. "
+                                    + "The baseline will be T*M, where M is 
the multiplier of the "
+                                    + "baseline.");
+
+    private SlowTaskDetectorOptions() {
+        throw new IllegalAccessError();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 2cc65061e25..5da5bf7c8b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -241,6 +242,10 @@ public class ExecutionVertex
         return currentExecution;
     }
 
+    public Collection<Execution> getCurrentExecutions() {
+        return Collections.singleton(currentExecution);
+    }
+
     @Override
     public ExecutionState getExecutionState() {
         return currentExecution.getState();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
new file mode 100644
index 00000000000..0df536bc2e8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.slowtaskdetector;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.IterableUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The slow task detector which detects slow tasks based on their execution 
time. */
+public class ExecutionTimeBasedSlowTaskDetector implements SlowTaskDetector {
+
+    private final long checkIntervalMillis;
+
+    private final long baselineLowerBoundMillis;
+
+    private final double baselineRatio;
+
+    private final double baselineMultiplier;
+
+    private ScheduledFuture<?> scheduledDetectionFuture;
+
+    public ExecutionTimeBasedSlowTaskDetector(Configuration configuration) {
+        this.checkIntervalMillis =
+                
configuration.get(SlowTaskDetectorOptions.CHECK_INTERVAL).toMillis();
+        checkArgument(
+                this.checkIntervalMillis > 0,
+                "The configuration {} should be positive, but is {}.",
+                SlowTaskDetectorOptions.CHECK_INTERVAL.key(),
+                this.checkIntervalMillis);
+
+        this.baselineLowerBoundMillis =
+                configuration
+                        
.get(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND)
+                        .toMillis();
+        checkArgument(
+                this.baselineLowerBoundMillis >= 0,
+                "The configuration {} cannot be negative, but is {}.",
+                
SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND.key(),
+                this.baselineLowerBoundMillis);
+
+        this.baselineRatio =
+                
configuration.getDouble(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO);
+        checkArgument(
+                baselineRatio >= 0 && this.baselineRatio < 1,
+                "The configuration {} should be in [0, 1), but is {}.",
+                SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO.key(),
+                this.baselineRatio);
+
+        this.baselineMultiplier =
+                
configuration.getDouble(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER);
+        checkArgument(
+                baselineMultiplier > 0,
+                "The configuration {} should be positive, but is {}.",
+                
SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER.key(),
+                this.baselineMultiplier);
+    }
+
+    @Override
+    public void start(
+            final ExecutionGraph executionGraph,
+            final SlowTaskDetectorListener listener,
+            final ComponentMainThreadExecutor mainThreadExecutor) {
+        scheduleTask(executionGraph, listener, mainThreadExecutor);
+    }
+
+    /** Schedule periodical slow task detection. */
+    private void scheduleTask(
+            final ExecutionGraph executionGraph,
+            final SlowTaskDetectorListener listener,
+            final ComponentMainThreadExecutor mainThreadExecutor) {
+        this.scheduledDetectionFuture =
+                mainThreadExecutor.schedule(
+                        () -> {
+                            
listener.notifySlowTasks(findSlowTasks(executionGraph));
+                            scheduleTask(executionGraph, listener, 
mainThreadExecutor);
+                        },
+                        checkIntervalMillis,
+                        TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Given that the parallelism is N and the ratio is R, define T as the 
median of the first N*R
+     * finished tasks' execution time. The baseline will be T*M, where M is 
the multiplier. A task
+     * will be identified as slow if its execution time is longer than the 
baseline.
+     */
+    @VisibleForTesting
+    Map<ExecutionVertexID, Collection<ExecutionAttemptID>> findSlowTasks(
+            final ExecutionGraph executionGraph) {
+        final long currentTimeMillis = System.currentTimeMillis();
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
= new HashMap<>();
+
+        final List<ExecutionJobVertex> jobVerticesToCheck = 
getJobVerticesToCheck(executionGraph);
+
+        for (ExecutionJobVertex ejv : jobVerticesToCheck) {
+            final long baseline = getBaseline(ejv, currentTimeMillis);
+
+            for (ExecutionVertex ev : ejv.getTaskVertices()) {
+                if (ev.getExecutionState().isTerminal()) {
+                    continue;
+                }
+
+                final List<ExecutionAttemptID> slowExecutions =
+                        findExecutionsExceedingBaseline(
+                                ev.getCurrentExecutions(), baseline, 
currentTimeMillis);
+
+                if (!slowExecutions.isEmpty()) {
+                    slowTasks.put(ev.getID(), slowExecutions);
+                }
+            }
+        }
+
+        return slowTasks;
+    }
+
+    private List<ExecutionJobVertex> getJobVerticesToCheck(final 
ExecutionGraph executionGraph) {
+        return 
IterableUtils.toStream(executionGraph.getVerticesTopologically())
+                .filter(ExecutionJobVertex::isInitialized)
+                .filter(ejv -> ejv.getAggregateState() != 
ExecutionState.FINISHED)
+                .filter(ejv -> getFinishedRatio(ejv) >= baselineRatio)
+                .collect(Collectors.toList());
+    }
+
+    private double getFinishedRatio(final ExecutionJobVertex 
executionJobVertex) {
+        checkState(executionJobVertex.getTaskVertices().length > 0);
+        long finishedCount =
+                Arrays.stream(executionJobVertex.getTaskVertices())
+                        .filter(ev -> ev.getExecutionState() == 
ExecutionState.FINISHED)
+                        .count();
+        return (double) finishedCount / 
executionJobVertex.getTaskVertices().length;
+    }
+
+    private long getBaseline(
+            final ExecutionJobVertex executionJobVertex, final long 
currentTimeMillis) {
+        final long executionTimeMedian =
+                calculateFinishedTaskExecutionTimeMedian(executionJobVertex, 
currentTimeMillis);
+        return (long) Math.max(baselineLowerBoundMillis, executionTimeMedian * 
baselineMultiplier);
+    }
+
+    private long calculateFinishedTaskExecutionTimeMedian(
+            final ExecutionJobVertex executionJobVertex, final long 
currentTime) {
+
+        final int baselineExecutionCount =
+                (int) Math.round(executionJobVertex.getParallelism() * 
baselineRatio);
+
+        if (baselineExecutionCount == 0) {
+            return 0;
+        }
+
+        final List<Execution> finishedExecutions =
+                Arrays.stream(executionJobVertex.getTaskVertices())
+                        .flatMap(ev -> ev.getCurrentExecutions().stream())
+                        .filter(e -> e.getState() == ExecutionState.FINISHED)
+                        .collect(Collectors.toList());
+
+        checkState(finishedExecutions.size() >= baselineExecutionCount);
+
+        final List<Long> firstFinishedExecutions =
+                finishedExecutions.stream()
+                        .map(e -> getExecutionTime(e, currentTime))
+                        .sorted()
+                        .limit(baselineExecutionCount)
+                        .collect(Collectors.toList());
+
+        return firstFinishedExecutions.get(baselineExecutionCount / 2);
+    }
+
+    private List<ExecutionAttemptID> findExecutionsExceedingBaseline(
+            Collection<Execution> executions, long baseline, long 
currentTimeMillis) {
+        return executions.stream()
+                .filter(e -> !e.getState().isTerminal() && e.getState() != 
ExecutionState.CANCELING)
+                .filter(e -> getExecutionTime(e, currentTimeMillis) >= 
baseline)
+                .map(Execution::getAttemptId)
+                .collect(Collectors.toList());
+    }
+
+    private long getExecutionTime(final Execution execution, final long 
currentTime) {
+        final long deployingTimestamp = 
execution.getStateTimestamp(ExecutionState.DEPLOYING);
+        if (deployingTimestamp == 0) {
+            return 0;
+        }
+
+        if (execution.getState() == ExecutionState.FINISHED) {
+            return execution.getStateTimestamp(ExecutionState.FINISHED) - 
deployingTimestamp;
+        } else {
+            return currentTime - deployingTimestamp;
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (scheduledDetectionFuture != null) {
+            scheduledDetectionFuture.cancel(false);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
new file mode 100644
index 00000000000..af2624a55be
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.slowtaskdetector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTestUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ExecutionTimeBasedSlowTaskDetector}. */
+class ExecutionTimeBasedSlowTaskDetectorTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    @Test
+    void testNoFinishedTaskButRatioIsZero() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex = createNoOpVertex(parallelism);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex);
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = 
createSlowTaskDetector(0, 1, 0);
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks).hasSize(parallelism);
+    }
+
+    @Test
+    void testFinishedTaskNotExceedRatio() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex = createNoOpVertex(parallelism);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex);
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.5, 1, 0);
+        final ExecutionVertex ev1 =
+                
executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
+        ev1.getCurrentExecutionAttempt().markFinished();
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks).isEmpty();
+    }
+
+    @Test
+    void testFinishedTaskExceedRatio() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex = createNoOpVertex(parallelism);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex);
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        // ev3 transitions to DEPLOYING later so that its execution time is 
the shortest
+        final ExecutionVertex ev3 =
+                
executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
+        ev3.getCurrentExecutionAttempt().markFinished();
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks).hasSize(2);
+    }
+
+    @Test
+    void testLargeLowerBound() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex = createNoOpVertex(parallelism);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex);
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, Integer.MAX_VALUE);
+
+        final ExecutionVertex ev3 =
+                
executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
+        ev3.getCurrentExecutionAttempt().markFinished();
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        // no task can exceed the large baseline
+        assertThat(slowTasks).isEmpty();
+    }
+
+    @Test
+    void testLargeMultiplier() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex = createNoOpVertex(parallelism);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex);
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1_000_000, 0);
+
+        Thread.sleep(10);
+
+        final ExecutionVertex ev3 =
+                
executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
+        ev3.getCurrentExecutionAttempt().markFinished();
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        // no task can exceed the large baseline
+        assertThat(slowTasks).isEmpty();
+    }
+
+    @Test
+    void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex1 = createNoOpVertex(parallelism);
+        final JobVertex jobVertex2 = createNoOpVertex(parallelism);
+        jobVertex2.connectNewDataSetAsInput(
+                jobVertex1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, 
jobVertex2);
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        final ExecutionVertex ev13 =
+                
executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2];
+        ev13.getCurrentExecutionAttempt().markFinished();
+        final ExecutionVertex ev23 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
+        ev23.getCurrentExecutionAttempt().markFinished();
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks).hasSize(4);
+    }
+
+    @Test
+    void testFinishedTaskExceedRatioInDynamicGraph() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex1 = createNoOpVertex(parallelism);
+        // create jobVertex2 and leave its parallelism unset
+        final JobVertex jobVertex2 = new JobVertex("vertex2");
+        jobVertex2.setInvokableClass(NoOpInvokable.class);
+        jobVertex2.connectNewDataSetAsInput(
+                jobVertex1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        final ExecutionGraph executionGraph = 
createDynamicExecutionGraph(jobVertex1, jobVertex2);
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        final ExecutionVertex ev13 =
+                
executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2];
+        ev13.getCurrentExecutionAttempt().markFinished();
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks).hasSize(2);
+    }
+
+    private ExecutionGraph createExecutionGraph(JobVertex... jobVertices) 
throws Exception {
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertices);
+
+        final SchedulerBase scheduler =
+                SchedulerTestingUtils.createScheduler(
+                        jobGraph,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        EXECUTOR_RESOURCE.getExecutor());
+
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+
+        scheduler.startScheduling();
+        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
+
+        return executionGraph;
+    }
+
+    private ExecutionGraph createDynamicExecutionGraph(JobVertex... 
jobVertices) throws Exception {
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertices);
+
+        final SchedulerBase scheduler =
+                new 
AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
+                                jobGraph,
+                                
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .build();
+
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+
+        scheduler.startScheduling();
+        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
+
+        return executionGraph;
+    }
+
+    private ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector(
+            double ratio, double multiplier, long lowerBoundMillis) {
+
+        final Configuration configuration = new Configuration();
+        configuration.set(
+                SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND,
+                Duration.ofMillis(lowerBoundMillis));
+        
configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, ratio);
+        
configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, 
multiplier);
+
+        return new ExecutionTimeBasedSlowTaskDetector(configuration);
+    }
+}

Reply via email to