ifndef-SleePy commented on a change in pull request #9568: [Flink-12164][tests] 
Harden JobMasterTest#testJobFailureWhenTaskExecutorHeartbeatTimeout
URL: https://github.com/apache/flink/pull/9568#discussion_r319804664
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
 ##########
 @@ -18,12 +18,175 @@
 
 package org.apache.flink.runtime.heartbeat;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link HeartbeatServices} implementation for testing purposes.
+ * This implementation is able to trigger a timeout of specific component 
manually.
  */
 public class TestingHeartbeatServices extends HeartbeatServices {
 
+       private static final long DEFAULT_HEARTBEAT_TIMEOUT = 10000L;
+
+       private static final long DEFAULT_HEARTBEAT_INTERVAL = 1000L;
+
+       private final Map<ResourceID, HeartbeatManagerImpl> heartbeatManagers = 
new ConcurrentHashMap<>();
+
+       private final Map<ResourceID, HeartbeatManagerSenderImpl> 
heartbeatManagerSenders = new ConcurrentHashMap<>();
+
        public TestingHeartbeatServices() {
-               super(1000L, 10000L);
+               super(DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT);
+       }
+
+       public TestingHeartbeatServices(long heartbeatInterval) {
+               super(heartbeatInterval, DEFAULT_HEARTBEAT_TIMEOUT);
+       }
+
+       public TestingHeartbeatServices(long heartbeatInterval, long 
heartbeatTimeout) {
+               super(heartbeatInterval, heartbeatTimeout);
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor mainThreadExecutor,
+               Logger log) {
+
+               HeartbeatManagerImpl<I, O> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       mainThreadExecutor,
+                       log,
+                       new TestingHeartbeatMonitorFactory<>());
+
+               heartbeatManagers.put(resourceId, heartbeatManager);
+               return heartbeatManager;
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor mainThreadExecutor,
+               Logger log) {
+
+               HeartbeatManagerSenderImpl<I, O> heartbeatManager = new 
HeartbeatManagerSenderImpl<>(
+                       heartbeatInterval,
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       mainThreadExecutor,
+                       log,
+                       new TestingHeartbeatMonitorFactory<>());
+
+               heartbeatManagerSenders.put(resourceId, heartbeatManager);
+               return heartbeatManager;
+       }
+
+       public void triggerHeartbeatTimeout(ResourceID managerResourceId, 
ResourceID targetResourceId) {
+
+               boolean triggered = false;
+               HeartbeatManagerImpl heartbeatManager = 
heartbeatManagers.get(managerResourceId);
+               if (heartbeatManager != null) {
+                       final TestingHeartbeatMonitor monitor =
+                               (TestingHeartbeatMonitor) 
heartbeatManager.getHeartbeatTargets().get(targetResourceId);
+                       if (monitor != null) {
+                               monitor.triggerHeartbeatTimeout();
+                               triggered = true;
+                       }
+               }
+
+               heartbeatManager = 
heartbeatManagerSenders.get(managerResourceId);
+               if (heartbeatManager != null) {
+                       final TestingHeartbeatMonitor monitor =
+                               (TestingHeartbeatMonitor) 
heartbeatManager.getHeartbeatTargets().get(targetResourceId);
+                       if (monitor != null) {
+                               monitor.triggerHeartbeatTimeout();
+                               triggered = true;
+                       }
+               }
+
+               checkState(triggered,
+                       "There is no target " + targetResourceId + " monitored 
under Heartbeat manager " + managerResourceId);
+       }
+
+       /**
+        * Factory instantiates testing monitor instance.
+        *
+        * @param <O> Type of the outgoing heartbeat payload
+        */
+       static class TestingHeartbeatMonitorFactory<O> implements 
HeartbeatMonitor.Factory<O> {
+
+               @Override
+               public HeartbeatMonitor<O> createHeartbeatMonitor(
+                       ResourceID resourceID,
+                       HeartbeatTarget<O> heartbeatTarget,
+                       ScheduledExecutor mainThreadExecutor,
+                       HeartbeatListener<?, O> heartbeatListener,
+                       long heartbeatTimeoutIntervalMs) {
+
+                       return new TestingHeartbeatMonitor<>(
+                               resourceID,
+                               heartbeatTarget,
+                               mainThreadExecutor,
+                               heartbeatListener,
+                               heartbeatTimeoutIntervalMs);
+               }
+       }
+
+       /**
+        * A heartbeat monitor for testing which supports triggering timeout 
manually.
+        *
+        * @param <O> Type of the outgoing heartbeat payload
+        */
+       static class TestingHeartbeatMonitor<O> extends HeartbeatMonitorImpl<O> 
{
 
 Review comment:
   Yes you are right, it indeed relies on a bit more implementation of 
`HeartbeatMonitorImpl`.
   I think your suggestion could work well.
   
   I'd like to make some details clear.
   
   I think we should keep the heartbeat working normally before triggering a 
timeout manually. Otherwise we might miss some important information like an 
unexpected heartbeat timeout. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to