This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 96800c94a52c58a86d000d9846c2ff8956732c76
Author: ifndef-SleePy <mmyy1...@gmail.com>
AuthorDate: Tue Aug 6 20:58:54 2019 +0800

    [Flink-12164][tests] Enrich TestingHeartbeatServices by supporting manually 
triggering timeout
---
 .../runtime/heartbeat/HeartbeatManagerImpl.java    |   6 +-
 .../heartbeat/HeartbeatManagerSenderImpl.java      |  23 ++-
 .../heartbeat/TestingHeartbeatServices.java        | 165 ++++++++++++++++++++-
 3 files changed, 188 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index a0d7f7a..67f2cbe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 
 import javax.annotation.concurrent.ThreadSafe;
 
-import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -112,8 +112,8 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O> {
                return heartbeatListener;
        }
 
-       Collection<HeartbeatMonitor<O>> getHeartbeatTargets() {
-               return heartbeatTargets.values();
+       Map<ResourceID, HeartbeatMonitor<O>> getHeartbeatTargets() {
+               return heartbeatTargets;
        }
 
        
//----------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index a61fe34..4c00f89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -43,12 +43,31 @@ public class HeartbeatManagerSenderImpl<I, O> extends 
HeartbeatManagerImpl<I, O>
                        HeartbeatListener<I, O> heartbeatListener,
                        ScheduledExecutor mainThreadExecutor,
                        Logger log) {
+               this(
+                       heartbeatPeriod,
+                       heartbeatTimeout,
+                       ownResourceID,
+                       heartbeatListener,
+                       mainThreadExecutor,
+                       log,
+                       new HeartbeatMonitorImpl.Factory<>());
+       }
+
+       HeartbeatManagerSenderImpl(
+                       long heartbeatPeriod,
+                       long heartbeatTimeout,
+                       ResourceID ownResourceID,
+                       HeartbeatListener<I, O> heartbeatListener,
+                       ScheduledExecutor mainThreadExecutor,
+                       Logger log,
+                       HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
                super(
                        heartbeatTimeout,
                        ownResourceID,
                        heartbeatListener,
                        mainThreadExecutor,
-                       log);
+                       log,
+                       heartbeatMonitorFactory);
 
                this.heartbeatPeriod = heartbeatPeriod;
                mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
@@ -58,7 +77,7 @@ public class HeartbeatManagerSenderImpl<I, O> extends 
HeartbeatManagerImpl<I, O>
        public void run() {
                if (!stopped) {
                        log.debug("Trigger heartbeat request.");
-                       for (HeartbeatMonitor<O> heartbeatMonitor : 
getHeartbeatTargets()) {
+                       for (HeartbeatMonitor<O> heartbeatMonitor : 
getHeartbeatTargets().values()) {
                                requestHeartbeat(heartbeatMonitor);
                        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
index 9de099a..3fb5c6d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
@@ -18,12 +18,175 @@
 
 package org.apache.flink.runtime.heartbeat;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link HeartbeatServices} implementation for testing purposes.
+ * This implementation is able to trigger a timeout of specific component 
manually.
  */
 public class TestingHeartbeatServices extends HeartbeatServices {
 
+       private static final long DEFAULT_HEARTBEAT_TIMEOUT = 10000L;
+
+       private static final long DEFAULT_HEARTBEAT_INTERVAL = 1000L;
+
+       private final Map<ResourceID, HeartbeatManagerImpl> heartbeatManagers = 
new ConcurrentHashMap<>();
+
+       private final Map<ResourceID, HeartbeatManagerSenderImpl> 
heartbeatManagerSenders = new ConcurrentHashMap<>();
+
        public TestingHeartbeatServices() {
-               super(1000L, 10000L);
+               super(DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT);
+       }
+
+       public TestingHeartbeatServices(long heartbeatInterval) {
+               super(heartbeatInterval, DEFAULT_HEARTBEAT_TIMEOUT);
+       }
+
+       public TestingHeartbeatServices(long heartbeatInterval, long 
heartbeatTimeout) {
+               super(heartbeatInterval, heartbeatTimeout);
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor mainThreadExecutor,
+               Logger log) {
+
+               HeartbeatManagerImpl<I, O> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       mainThreadExecutor,
+                       log,
+                       new TestingHeartbeatMonitorFactory<>());
+
+               heartbeatManagers.put(resourceId, heartbeatManager);
+               return heartbeatManager;
+       }
+
+       @Override
+       public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+               ResourceID resourceId,
+               HeartbeatListener<I, O> heartbeatListener,
+               ScheduledExecutor mainThreadExecutor,
+               Logger log) {
+
+               HeartbeatManagerSenderImpl<I, O> heartbeatManager = new 
HeartbeatManagerSenderImpl<>(
+                       heartbeatInterval,
+                       heartbeatTimeout,
+                       resourceId,
+                       heartbeatListener,
+                       mainThreadExecutor,
+                       log,
+                       new TestingHeartbeatMonitorFactory<>());
+
+               heartbeatManagerSenders.put(resourceId, heartbeatManager);
+               return heartbeatManager;
+       }
+
+       public void triggerHeartbeatTimeout(ResourceID managerResourceId, 
ResourceID targetResourceId) {
+
+               boolean triggered = false;
+               HeartbeatManagerImpl heartbeatManager = 
heartbeatManagers.get(managerResourceId);
+               if (heartbeatManager != null) {
+                       final TestingHeartbeatMonitor monitor =
+                               (TestingHeartbeatMonitor) 
heartbeatManager.getHeartbeatTargets().get(targetResourceId);
+                       if (monitor != null) {
+                               monitor.triggerHeartbeatTimeout();
+                               triggered = true;
+                       }
+               }
+
+               heartbeatManager = 
heartbeatManagerSenders.get(managerResourceId);
+               if (heartbeatManager != null) {
+                       final TestingHeartbeatMonitor monitor =
+                               (TestingHeartbeatMonitor) 
heartbeatManager.getHeartbeatTargets().get(targetResourceId);
+                       if (monitor != null) {
+                               monitor.triggerHeartbeatTimeout();
+                               triggered = true;
+                       }
+               }
+
+               checkState(triggered,
+                       "There is no target " + targetResourceId + " monitored 
under Heartbeat manager " + managerResourceId);
+       }
+
+       /**
+        * Factory instantiates testing monitor instance.
+        *
+        * @param <O> Type of the outgoing heartbeat payload
+        */
+       static class TestingHeartbeatMonitorFactory<O> implements 
HeartbeatMonitor.Factory<O> {
+
+               @Override
+               public HeartbeatMonitor<O> createHeartbeatMonitor(
+                       ResourceID resourceID,
+                       HeartbeatTarget<O> heartbeatTarget,
+                       ScheduledExecutor mainThreadExecutor,
+                       HeartbeatListener<?, O> heartbeatListener,
+                       long heartbeatTimeoutIntervalMs) {
+
+                       return new TestingHeartbeatMonitor<>(
+                               resourceID,
+                               heartbeatTarget,
+                               mainThreadExecutor,
+                               heartbeatListener,
+                               heartbeatTimeoutIntervalMs);
+               }
+       }
+
+       /**
+        * A heartbeat monitor for testing which supports triggering timeout 
manually.
+        *
+        * @param <O> Type of the outgoing heartbeat payload
+        */
+       static class TestingHeartbeatMonitor<O> extends HeartbeatMonitorImpl<O> 
{
+
+               private volatile boolean timeoutTriggered = false;
+
+               TestingHeartbeatMonitor(
+                       ResourceID resourceID,
+                       HeartbeatTarget<O> heartbeatTarget,
+                       ScheduledExecutor scheduledExecutor,
+                       HeartbeatListener<?, O> heartbeatListener,
+                       long heartbeatTimeoutIntervalMs) {
+
+                       super(resourceID, heartbeatTarget, scheduledExecutor, 
heartbeatListener, heartbeatTimeoutIntervalMs);
+               }
+
+               @Override
+               public void reportHeartbeat() {
+                       if (!timeoutTriggered) {
+                               super.reportHeartbeat();
+                       }
+                       // just swallow the heartbeat report
+               }
+
+               @Override
+               void resetHeartbeatTimeout(long heartbeatTimeout) {
+                       synchronized (this) {
+                               if (timeoutTriggered) {
+                                       super.resetHeartbeatTimeout(0);
+                               } else {
+                                       
super.resetHeartbeatTimeout(heartbeatTimeout);
+                               }
+                       }
+               }
+
+               void triggerHeartbeatTimeout() {
+                       synchronized (this) {
+                               timeoutTriggered = true;
+                               resetHeartbeatTimeout(0);
+                       }
+               }
        }
 }

Reply via email to