This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ef07590403ad7448a523ef5da78da0306cccdae7 Author: hongli <[email protected]> AuthorDate: Sat Jun 25 17:05:26 2022 +0800 [FLINK-28136][runtime] Implement ExecutionTimeBasedSlowTaskDetector This closes #20072. --- .../generated/expert_scheduling_section.html | 24 ++ .../slow_task_detector_configuration.html | 36 +++ .../configuration/SlowTaskDetectorOptions.java | 71 ++++++ .../runtime/executiongraph/ExecutionVertex.java | 5 + .../ExecutionTimeBasedSlowTaskDetector.java | 234 +++++++++++++++++++ .../ExecutionTimeBasedSlowTaskDetectorTest.java | 250 +++++++++++++++++++++ 6 files changed, 620 insertions(+) diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html index 2121a982881..b78d9bfd2db 100644 --- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html +++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html @@ -104,5 +104,29 @@ <td>Integer</td> <td>Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.</td> </tr> + <tr> + <td><h5>slow-task-detector.check-interval</h5></td> + <td style="word-wrap: break-word;">1 s</td> + <td>Duration</td> + <td>The interval to check slow tasks.</td> + </tr> + <tr> + <td><h5>slow-task-detector.execution-time.baseline-lower-bound</h5></td> + <td style="word-wrap: break-word;">1 min</td> + <td>Duration</td> + <td>The lower bound of slow task detection baseline.</td> + </tr> + <tr> + <td><h5>slow-task-detector.execution-time.baseline-multiplier</h5></td> + <td style="word-wrap: break-word;">1.5</td> + <td>Double</td> + <td>The multiplier to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline.</td> + </tr> + <tr> + <td><h5>slow-task-detector.execution-time.baseline-ratio</h5></td> + <td style="word-wrap: break-word;">0.75</td> + <td>Double</td> + <td>The finished execution ratio threshold to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline.</td> + </tr> </tbody> </table> diff --git a/docs/layouts/shortcodes/generated/slow_task_detector_configuration.html b/docs/layouts/shortcodes/generated/slow_task_detector_configuration.html new file mode 100644 index 00000000000..631bbc7122b --- /dev/null +++ b/docs/layouts/shortcodes/generated/slow_task_detector_configuration.html @@ -0,0 +1,36 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>slow-task-detector.check-interval</h5></td> + <td style="word-wrap: break-word;">1 s</td> + <td>Duration</td> + <td>The interval to check slow tasks.</td> + </tr> + <tr> + <td><h5>slow-task-detector.execution-time.baseline-lower-bound</h5></td> + <td style="word-wrap: break-word;">1 min</td> + <td>Duration</td> + <td>The lower bound of slow task detection baseline.</td> + </tr> + <tr> + <td><h5>slow-task-detector.execution-time.baseline-multiplier</h5></td> + <td style="word-wrap: break-word;">1.5</td> + <td>Double</td> + <td>The multiplier to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline.</td> + </tr> + <tr> + <td><h5>slow-task-detector.execution-time.baseline-ratio</h5></td> + <td style="word-wrap: break-word;">0.75</td> + <td>Double</td> + <td>The finished execution ratio threshold to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline.</td> + </tr> + </tbody> +</table> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SlowTaskDetectorOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SlowTaskDetectorOptions.java new file mode 100644 index 00000000000..12164f7c06e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/SlowTaskDetectorOptions.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.docs.Documentation; + +import java.time.Duration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Configuration options to detect slow tasks. */ +public class SlowTaskDetectorOptions { + + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Duration> CHECK_INTERVAL = + key("slow-task-detector.check-interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("The interval to check slow tasks."); + + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Duration> EXECUTION_TIME_BASELINE_LOWER_BOUND = + key("slow-task-detector.execution-time.baseline-lower-bound") + .durationType() + .defaultValue(Duration.ofMinutes(1)) + .withDescription("The lower bound of slow task detection baseline."); + + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Double> EXECUTION_TIME_BASELINE_RATIO = + key("slow-task-detector.execution-time.baseline-ratio") + .doubleType() + .defaultValue(0.75) + .withDescription( + "The finished execution ratio threshold to calculate the slow tasks " + + "detection baseline. Given that the parallelism is N and the " + + "ratio is R, define T as the median of the first N*R finished " + + "tasks' execution time. The baseline will be T*M, where M is " + + "the multiplier of the baseline."); + + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Double> EXECUTION_TIME_BASELINE_MULTIPLIER = + key("slow-task-detector.execution-time.baseline-multiplier") + .doubleType() + .defaultValue(1.5) + .withDescription( + "The multiplier to calculate the slow tasks detection baseline. Given " + + "that the parallelism is N and the ratio is R, define T as " + + "the median of the first N*R finished tasks' execution time. " + + "The baseline will be T*M, where M is the multiplier of the " + + "baseline."); + + private SlowTaskDetectorOptions() { + throw new IllegalAccessError(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 2cc65061e25..5da5bf7c8b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -39,6 +39,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -241,6 +242,10 @@ public class ExecutionVertex return currentExecution; } + public Collection<Execution> getCurrentExecutions() { + return Collections.singleton(currentExecution); + } + @Override public ExecutionState getExecutionState() { return currentExecution.getState(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java new file mode 100644 index 00000000000..0df536bc2e8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.slowtaskdetector; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SlowTaskDetectorOptions; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.IterableUtils; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** The slow task detector which detects slow tasks based on their execution time. */ +public class ExecutionTimeBasedSlowTaskDetector implements SlowTaskDetector { + + private final long checkIntervalMillis; + + private final long baselineLowerBoundMillis; + + private final double baselineRatio; + + private final double baselineMultiplier; + + private ScheduledFuture<?> scheduledDetectionFuture; + + public ExecutionTimeBasedSlowTaskDetector(Configuration configuration) { + this.checkIntervalMillis = + configuration.get(SlowTaskDetectorOptions.CHECK_INTERVAL).toMillis(); + checkArgument( + this.checkIntervalMillis > 0, + "The configuration {} should be positive, but is {}.", + SlowTaskDetectorOptions.CHECK_INTERVAL.key(), + this.checkIntervalMillis); + + this.baselineLowerBoundMillis = + configuration + .get(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND) + .toMillis(); + checkArgument( + this.baselineLowerBoundMillis >= 0, + "The configuration {} cannot be negative, but is {}.", + SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND.key(), + this.baselineLowerBoundMillis); + + this.baselineRatio = + configuration.getDouble(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO); + checkArgument( + baselineRatio >= 0 && this.baselineRatio < 1, + "The configuration {} should be in [0, 1), but is {}.", + SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO.key(), + this.baselineRatio); + + this.baselineMultiplier = + configuration.getDouble(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER); + checkArgument( + baselineMultiplier > 0, + "The configuration {} should be positive, but is {}.", + SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER.key(), + this.baselineMultiplier); + } + + @Override + public void start( + final ExecutionGraph executionGraph, + final SlowTaskDetectorListener listener, + final ComponentMainThreadExecutor mainThreadExecutor) { + scheduleTask(executionGraph, listener, mainThreadExecutor); + } + + /** Schedule periodical slow task detection. */ + private void scheduleTask( + final ExecutionGraph executionGraph, + final SlowTaskDetectorListener listener, + final ComponentMainThreadExecutor mainThreadExecutor) { + this.scheduledDetectionFuture = + mainThreadExecutor.schedule( + () -> { + listener.notifySlowTasks(findSlowTasks(executionGraph)); + scheduleTask(executionGraph, listener, mainThreadExecutor); + }, + checkIntervalMillis, + TimeUnit.MILLISECONDS); + } + + /** + * Given that the parallelism is N and the ratio is R, define T as the median of the first N*R + * finished tasks' execution time. The baseline will be T*M, where M is the multiplier. A task + * will be identified as slow if its execution time is longer than the baseline. + */ + @VisibleForTesting + Map<ExecutionVertexID, Collection<ExecutionAttemptID>> findSlowTasks( + final ExecutionGraph executionGraph) { + final long currentTimeMillis = System.currentTimeMillis(); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = new HashMap<>(); + + final List<ExecutionJobVertex> jobVerticesToCheck = getJobVerticesToCheck(executionGraph); + + for (ExecutionJobVertex ejv : jobVerticesToCheck) { + final long baseline = getBaseline(ejv, currentTimeMillis); + + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState().isTerminal()) { + continue; + } + + final List<ExecutionAttemptID> slowExecutions = + findExecutionsExceedingBaseline( + ev.getCurrentExecutions(), baseline, currentTimeMillis); + + if (!slowExecutions.isEmpty()) { + slowTasks.put(ev.getID(), slowExecutions); + } + } + } + + return slowTasks; + } + + private List<ExecutionJobVertex> getJobVerticesToCheck(final ExecutionGraph executionGraph) { + return IterableUtils.toStream(executionGraph.getVerticesTopologically()) + .filter(ExecutionJobVertex::isInitialized) + .filter(ejv -> ejv.getAggregateState() != ExecutionState.FINISHED) + .filter(ejv -> getFinishedRatio(ejv) >= baselineRatio) + .collect(Collectors.toList()); + } + + private double getFinishedRatio(final ExecutionJobVertex executionJobVertex) { + checkState(executionJobVertex.getTaskVertices().length > 0); + long finishedCount = + Arrays.stream(executionJobVertex.getTaskVertices()) + .filter(ev -> ev.getExecutionState() == ExecutionState.FINISHED) + .count(); + return (double) finishedCount / executionJobVertex.getTaskVertices().length; + } + + private long getBaseline( + final ExecutionJobVertex executionJobVertex, final long currentTimeMillis) { + final long executionTimeMedian = + calculateFinishedTaskExecutionTimeMedian(executionJobVertex, currentTimeMillis); + return (long) Math.max(baselineLowerBoundMillis, executionTimeMedian * baselineMultiplier); + } + + private long calculateFinishedTaskExecutionTimeMedian( + final ExecutionJobVertex executionJobVertex, final long currentTime) { + + final int baselineExecutionCount = + (int) Math.round(executionJobVertex.getParallelism() * baselineRatio); + + if (baselineExecutionCount == 0) { + return 0; + } + + final List<Execution> finishedExecutions = + Arrays.stream(executionJobVertex.getTaskVertices()) + .flatMap(ev -> ev.getCurrentExecutions().stream()) + .filter(e -> e.getState() == ExecutionState.FINISHED) + .collect(Collectors.toList()); + + checkState(finishedExecutions.size() >= baselineExecutionCount); + + final List<Long> firstFinishedExecutions = + finishedExecutions.stream() + .map(e -> getExecutionTime(e, currentTime)) + .sorted() + .limit(baselineExecutionCount) + .collect(Collectors.toList()); + + return firstFinishedExecutions.get(baselineExecutionCount / 2); + } + + private List<ExecutionAttemptID> findExecutionsExceedingBaseline( + Collection<Execution> executions, long baseline, long currentTimeMillis) { + return executions.stream() + .filter(e -> !e.getState().isTerminal() && e.getState() != ExecutionState.CANCELING) + .filter(e -> getExecutionTime(e, currentTimeMillis) >= baseline) + .map(Execution::getAttemptId) + .collect(Collectors.toList()); + } + + private long getExecutionTime(final Execution execution, final long currentTime) { + final long deployingTimestamp = execution.getStateTimestamp(ExecutionState.DEPLOYING); + if (deployingTimestamp == 0) { + return 0; + } + + if (execution.getState() == ExecutionState.FINISHED) { + return execution.getStateTimestamp(ExecutionState.FINISHED) - deployingTimestamp; + } else { + return currentTime - deployingTimestamp; + } + } + + @Override + public void stop() { + if (scheduledDetectionFuture != null) { + scheduledDetectionFuture.cancel(false); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java new file mode 100644 index 00000000000..af2624a55be --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.slowtaskdetector; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SlowTaskDetectorOptions; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTestUtils; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ExecutionTimeBasedSlowTaskDetector}. */ +class ExecutionTimeBasedSlowTaskDetectorTest { + + @RegisterExtension + private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @Test + void testNoFinishedTaskButRatioIsZero() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex = createNoOpVertex(parallelism); + final ExecutionGraph executionGraph = createExecutionGraph(jobVertex); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = createSlowTaskDetector(0, 1, 0); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + assertThat(slowTasks).hasSize(parallelism); + } + + @Test + void testFinishedTaskNotExceedRatio() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex = createNoOpVertex(parallelism); + final ExecutionGraph executionGraph = createExecutionGraph(jobVertex); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = + createSlowTaskDetector(0.5, 1, 0); + final ExecutionVertex ev1 = + executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0]; + ev1.getCurrentExecutionAttempt().markFinished(); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + assertThat(slowTasks).isEmpty(); + } + + @Test + void testFinishedTaskExceedRatio() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex = createNoOpVertex(parallelism); + final ExecutionGraph executionGraph = createExecutionGraph(jobVertex); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = + createSlowTaskDetector(0.3, 1, 0); + + // ev3 transitions to DEPLOYING later so that its execution time is the shortest + final ExecutionVertex ev3 = + executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2]; + ev3.getCurrentExecutionAttempt().markFinished(); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + assertThat(slowTasks).hasSize(2); + } + + @Test + void testLargeLowerBound() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex = createNoOpVertex(parallelism); + final ExecutionGraph executionGraph = createExecutionGraph(jobVertex); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = + createSlowTaskDetector(0.3, 1, Integer.MAX_VALUE); + + final ExecutionVertex ev3 = + executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2]; + ev3.getCurrentExecutionAttempt().markFinished(); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + // no task can exceed the large baseline + assertThat(slowTasks).isEmpty(); + } + + @Test + void testLargeMultiplier() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex = createNoOpVertex(parallelism); + final ExecutionGraph executionGraph = createExecutionGraph(jobVertex); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = + createSlowTaskDetector(0.3, 1_000_000, 0); + + Thread.sleep(10); + + final ExecutionVertex ev3 = + executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2]; + ev3.getCurrentExecutionAttempt().markFinished(); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + // no task can exceed the large baseline + assertThat(slowTasks).isEmpty(); + } + + @Test + void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex1 = createNoOpVertex(parallelism); + final JobVertex jobVertex2 = createNoOpVertex(parallelism); + jobVertex2.connectNewDataSetAsInput( + jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, jobVertex2); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = + createSlowTaskDetector(0.3, 1, 0); + + final ExecutionVertex ev13 = + executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2]; + ev13.getCurrentExecutionAttempt().markFinished(); + final ExecutionVertex ev23 = + executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2]; + ev23.getCurrentExecutionAttempt().markFinished(); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + assertThat(slowTasks).hasSize(4); + } + + @Test + void testFinishedTaskExceedRatioInDynamicGraph() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex1 = createNoOpVertex(parallelism); + // create jobVertex2 and leave its parallelism unset + final JobVertex jobVertex2 = new JobVertex("vertex2"); + jobVertex2.setInvokableClass(NoOpInvokable.class); + jobVertex2.connectNewDataSetAsInput( + jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + final ExecutionGraph executionGraph = createDynamicExecutionGraph(jobVertex1, jobVertex2); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = + createSlowTaskDetector(0.3, 1, 0); + + final ExecutionVertex ev13 = + executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2]; + ev13.getCurrentExecutionAttempt().markFinished(); + + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + assertThat(slowTasks).hasSize(2); + } + + private ExecutionGraph createExecutionGraph(JobVertex... jobVertices) throws Exception { + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices); + + final SchedulerBase scheduler = + SchedulerTestingUtils.createScheduler( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + EXECUTOR_RESOURCE.getExecutor()); + + final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); + + scheduler.startScheduling(); + ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph); + + return executionGraph; + } + + private ExecutionGraph createDynamicExecutionGraph(JobVertex... jobVertices) throws Exception { + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices); + + final SchedulerBase scheduler = + new AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + EXECUTOR_RESOURCE.getExecutor()) + .build(); + + final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); + + scheduler.startScheduling(); + ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph); + + return executionGraph; + } + + private ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector( + double ratio, double multiplier, long lowerBoundMillis) { + + final Configuration configuration = new Configuration(); + configuration.set( + SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, + Duration.ofMillis(lowerBoundMillis)); + configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, ratio); + configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, multiplier); + + return new ExecutionTimeBasedSlowTaskDetector(configuration); + } +}
