tillrohrmann commented on a change in pull request #9568: [Flink-12164][tests] 
Harden JobMasterTest#testJobFailureWhenTaskExecutorHeartbeatTimeout
URL: https://github.com/apache/flink/pull/9568#discussion_r319580602
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
 ##########
 @@ -18,12 +18,175 @@
 
 package org.apache.flink.runtime.heartbeat;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link HeartbeatServices} implementation for testing purposes.
+ * This implementation is able to trigger a timeout of specific component 
manually.
  */
 public class TestingHeartbeatServices extends HeartbeatServices {
 
+       private static final long DEFAULT_HEARTBEAT_TIMEOUT = 10000L;
+
+       private static final long DEFAULT_HEARTBEAT_INTERVAL = 1000L;
+
+       private final Map<ResourceID, HeartbeatManagerImpl> heartbeatManagers = 
new ConcurrentHashMap<>();
+
+       private final Map<ResourceID, HeartbeatManagerSenderImpl> 
heartbeatManagerSenders = new ConcurrentHashMap<>();
+
        public TestingHeartbeatServices() {
-               super(1000L, 10000L);
+               super(DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT);
+       }
+
+       public TestingHeartbeatServices(long heartbeatInterval) {
+               super(heartbeatInterval, DEFAULT_HEARTBEAT_TIMEOUT);
+       }
+
+       public TestingHeartbeatServices(long heartbeatInterval, long 
heartbeatTimeout) {
+               super(heartbeatInterval, heartbeatTimeout);
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor mainThreadExecutor,
+               Logger log) {
+
+               HeartbeatManagerImpl<I, O> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       mainThreadExecutor,
+                       log,
+                       new TestingHeartbeatMonitorFactory<>());
+
+               heartbeatManagers.put(resourceId, heartbeatManager);
+               return heartbeatManager;
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor mainThreadExecutor,
+               Logger log) {
+
+               HeartbeatManagerSenderImpl<I, O> heartbeatManager = new 
HeartbeatManagerSenderImpl<>(
+                       heartbeatInterval,
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       mainThreadExecutor,
+                       log,
+                       new TestingHeartbeatMonitorFactory<>());
+
+               heartbeatManagerSenders.put(resourceId, heartbeatManager);
+               return heartbeatManager;
+       }
+
+       public void triggerHeartbeatTimeout(ResourceID managerResourceId, 
ResourceID targetResourceId) {
+
+               boolean triggered = false;
+               HeartbeatManagerImpl heartbeatManager = 
heartbeatManagers.get(managerResourceId);
+               if (heartbeatManager != null) {
+                       final TestingHeartbeatMonitor monitor =
+                               (TestingHeartbeatMonitor) 
heartbeatManager.getHeartbeatTargets().get(targetResourceId);
+                       if (monitor != null) {
+                               monitor.triggerHeartbeatTimeout();
+                               triggered = true;
+                       }
+               }
+
+               heartbeatManager = 
heartbeatManagerSenders.get(managerResourceId);
+               if (heartbeatManager != null) {
+                       final TestingHeartbeatMonitor monitor =
+                               (TestingHeartbeatMonitor) 
heartbeatManager.getHeartbeatTargets().get(targetResourceId);
+                       if (monitor != null) {
+                               monitor.triggerHeartbeatTimeout();
+                               triggered = true;
+                       }
+               }
+
+               checkState(triggered,
+                       "There is no target " + targetResourceId + " monitored 
under Heartbeat manager " + managerResourceId);
+       }
+
+       /**
+        * Factory instantiates testing monitor instance.
+        *
+        * @param <O> Type of the outgoing heartbeat payload
+        */
+       static class TestingHeartbeatMonitorFactory<O> implements 
HeartbeatMonitor.Factory<O> {
+
+               @Override
+               public HeartbeatMonitor<O> createHeartbeatMonitor(
+                       ResourceID resourceID,
+                       HeartbeatTarget<O> heartbeatTarget,
+                       ScheduledExecutor mainThreadExecutor,
+                       HeartbeatListener<?, O> heartbeatListener,
+                       long heartbeatTimeoutIntervalMs) {
+
+                       return new TestingHeartbeatMonitor<>(
+                               resourceID,
+                               heartbeatTarget,
+                               mainThreadExecutor,
+                               heartbeatListener,
+                               heartbeatTimeoutIntervalMs);
+               }
+       }
+
+       /**
+        * A heartbeat monitor for testing which supports triggering timeout 
manually.
+        *
+        * @param <O> Type of the outgoing heartbeat payload
+        */
+       static class TestingHeartbeatMonitor<O> extends HeartbeatMonitorImpl<O> 
{
 
 Review comment:
   Couldn't we also implement a very simple `HeartbeatMonitor` which keeps a 
reference to the `ScheduledExecutor` and the `HeartbeatListener` and then 
simply calls `scheduledExecutor.execute(() -> 
heartbeatListener.notifyHeartbeatTimeout)` if `triggerHeartbeatTimeout` is 
called? The current implementation relies a lot on the underlying 
implementation details of `HeartbeatMonitorImpl`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to