This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 94ece419632ce57f01558cd0fec9ab0d182d4f4b Author: Zhu Zhu <[email protected]> AuthorDate: Fri Jul 15 15:45:32 2022 +0800 [hotfix][runtime] Strengthen SpeculativeScheduler and its tests Stablizes tests and better exposes exceptions for troubleshooting. --- .../runtime/scheduler/adaptivebatch/SpeculativeScheduler.java | 10 +++++++++- .../scheduler/adaptivebatch/SpeculativeSchedulerTest.java | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java index 919ec05cf1d..6d9302c1226 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java @@ -307,7 +307,15 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler return slowExecutions.stream() .map(id -> getExecutionGraph().getRegisteredExecutions().get(id)) - .map(Execution::getAssignedResourceLocation) + .map( + e -> { + checkNotNull( + e.getAssignedResource(), + "The reported slow node have not been assigned a slot. " + + "This is unexpected and indicates that there is " + + "something wrong with the slow task detector."); + return e.getAssignedResourceLocation(); + }) .map(TaskManagerLocation::getNodeId) .collect(Collectors.toSet()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java index 75456468447..ef78ad085f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler.adaptivebatch; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SlowTaskDetectorOptions; import org.apache.flink.runtime.blocklist.BlockedNode; import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -59,6 +61,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -394,13 +397,18 @@ class SpeculativeSchedulerTest { private DefaultSchedulerBuilder createSchedulerBuilder( final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) { + // disable periodical slow task detection to avoid affecting the designed testing process + final Configuration configuration = new Configuration(); + configuration.set(SlowTaskDetectorOptions.CHECK_INTERVAL, Duration.ofDays(1)); + return new DefaultSchedulerBuilder( jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setBlocklistOperations(testBlocklistOperations) .setExecutionOperations(testExecutionOperations) .setFutureExecutor(futureExecutor) .setDelayExecutor(taskRestartExecutor) - .setRestartBackoffTimeStrategy(restartStrategy); + .setRestartBackoffTimeStrategy(restartStrategy) + .setJobMasterConfiguration(configuration); } private static void notifySlowTask(
