This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94ece419632ce57f01558cd0fec9ab0d182d4f4b
Author: Zhu Zhu <[email protected]>
AuthorDate: Fri Jul 15 15:45:32 2022 +0800

    [hotfix][runtime] Strengthen SpeculativeScheduler and its tests
    
    Stablizes tests and better exposes exceptions for troubleshooting.
---
 .../runtime/scheduler/adaptivebatch/SpeculativeScheduler.java  | 10 +++++++++-
 .../scheduler/adaptivebatch/SpeculativeSchedulerTest.java      | 10 +++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 919ec05cf1d..6d9302c1226 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -307,7 +307,15 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
 
         return slowExecutions.stream()
                 .map(id -> 
getExecutionGraph().getRegisteredExecutions().get(id))
-                .map(Execution::getAssignedResourceLocation)
+                .map(
+                        e -> {
+                            checkNotNull(
+                                    e.getAssignedResource(),
+                                    "The reported slow node have not been 
assigned a slot. "
+                                            + "This is unexpected and 
indicates that there is "
+                                            + "something wrong with the slow 
task detector.");
+                            return e.getAssignedResourceLocation();
+                        })
                 .map(TaskManagerLocation::getNodeId)
                 .collect(Collectors.toSet());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index 75456468447..ef78ad085f7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -20,6 +20,8 @@
 package org.apache.flink.runtime.scheduler.adaptivebatch;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
 import org.apache.flink.runtime.blocklist.BlockedNode;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -59,6 +61,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -394,13 +397,18 @@ class SpeculativeSchedulerTest {
 
     private DefaultSchedulerBuilder createSchedulerBuilder(
             final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor) {
+        // disable periodical slow task detection to avoid affecting the 
designed testing process
+        final Configuration configuration = new Configuration();
+        configuration.set(SlowTaskDetectorOptions.CHECK_INTERVAL, 
Duration.ofDays(1));
+
         return new DefaultSchedulerBuilder(
                         jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
                 .setBlocklistOperations(testBlocklistOperations)
                 .setExecutionOperations(testExecutionOperations)
                 .setFutureExecutor(futureExecutor)
                 .setDelayExecutor(taskRestartExecutor)
-                .setRestartBackoffTimeStrategy(restartStrategy);
+                .setRestartBackoffTimeStrategy(restartStrategy)
+                .setJobMasterConfiguration(configuration);
     }
 
     private static void notifySlowTask(

Reply via email to