This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 96800c94a52c58a86d000d9846c2ff8956732c76 Author: ifndef-SleePy <mmyy1...@gmail.com> AuthorDate: Tue Aug 6 20:58:54 2019 +0800 [Flink-12164][tests] Enrich TestingHeartbeatServices by supporting manually triggering timeout --- .../runtime/heartbeat/HeartbeatManagerImpl.java | 6 +- .../heartbeat/HeartbeatManagerSenderImpl.java | 23 ++- .../heartbeat/TestingHeartbeatServices.java | 165 ++++++++++++++++++++- 3 files changed, 188 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java index a0d7f7a..67f2cbe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import javax.annotation.concurrent.ThreadSafe; -import java.util.Collection; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** @@ -112,8 +112,8 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { return heartbeatListener; } - Collection<HeartbeatMonitor<O>> getHeartbeatTargets() { - return heartbeatTargets.values(); + Map<ResourceID, HeartbeatMonitor<O>> getHeartbeatTargets() { + return heartbeatTargets; } //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java index a61fe34..4c00f89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -43,12 +43,31 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log) { + this( + heartbeatPeriod, + heartbeatTimeout, + ownResourceID, + heartbeatListener, + mainThreadExecutor, + log, + new HeartbeatMonitorImpl.Factory<>()); + } + + HeartbeatManagerSenderImpl( + long heartbeatPeriod, + long heartbeatTimeout, + ResourceID ownResourceID, + HeartbeatListener<I, O> heartbeatListener, + ScheduledExecutor mainThreadExecutor, + Logger log, + HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) { super( heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, - log); + log, + heartbeatMonitorFactory); this.heartbeatPeriod = heartbeatPeriod; mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS); @@ -58,7 +77,7 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> public void run() { if (!stopped) { log.debug("Trigger heartbeat request."); - for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) { + for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) { requestHeartbeat(heartbeatMonitor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java index 9de099a..3fb5c6d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java @@ -18,12 +18,175 @@ package org.apache.flink.runtime.heartbeat; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.util.Preconditions.checkState; + /** * A {@link HeartbeatServices} implementation for testing purposes. + * This implementation is able to trigger a timeout of specific component manually. */ public class TestingHeartbeatServices extends HeartbeatServices { + private static final long DEFAULT_HEARTBEAT_TIMEOUT = 10000L; + + private static final long DEFAULT_HEARTBEAT_INTERVAL = 1000L; + + private final Map<ResourceID, HeartbeatManagerImpl> heartbeatManagers = new ConcurrentHashMap<>(); + + private final Map<ResourceID, HeartbeatManagerSenderImpl> heartbeatManagerSenders = new ConcurrentHashMap<>(); + public TestingHeartbeatServices() { - super(1000L, 10000L); + super(DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT); + } + + public TestingHeartbeatServices(long heartbeatInterval) { + super(heartbeatInterval, DEFAULT_HEARTBEAT_TIMEOUT); + } + + public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { + super(heartbeatInterval, heartbeatTimeout); + } + + @Override + public <I, O> HeartbeatManager<I, O> createHeartbeatManager( + ResourceID resourceId, + HeartbeatListener<I, O> heartbeatListener, + ScheduledExecutor mainThreadExecutor, + Logger log) { + + HeartbeatManagerImpl<I, O> heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + resourceId, + heartbeatListener, + mainThreadExecutor, + log, + new TestingHeartbeatMonitorFactory<>()); + + heartbeatManagers.put(resourceId, heartbeatManager); + return heartbeatManager; + } + + @Override + public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender( + ResourceID resourceId, + HeartbeatListener<I, O> heartbeatListener, + ScheduledExecutor mainThreadExecutor, + Logger log) { + + HeartbeatManagerSenderImpl<I, O> heartbeatManager = new HeartbeatManagerSenderImpl<>( + heartbeatInterval, + heartbeatTimeout, + resourceId, + heartbeatListener, + mainThreadExecutor, + log, + new TestingHeartbeatMonitorFactory<>()); + + heartbeatManagerSenders.put(resourceId, heartbeatManager); + return heartbeatManager; + } + + public void triggerHeartbeatTimeout(ResourceID managerResourceId, ResourceID targetResourceId) { + + boolean triggered = false; + HeartbeatManagerImpl heartbeatManager = heartbeatManagers.get(managerResourceId); + if (heartbeatManager != null) { + final TestingHeartbeatMonitor monitor = + (TestingHeartbeatMonitor) heartbeatManager.getHeartbeatTargets().get(targetResourceId); + if (monitor != null) { + monitor.triggerHeartbeatTimeout(); + triggered = true; + } + } + + heartbeatManager = heartbeatManagerSenders.get(managerResourceId); + if (heartbeatManager != null) { + final TestingHeartbeatMonitor monitor = + (TestingHeartbeatMonitor) heartbeatManager.getHeartbeatTargets().get(targetResourceId); + if (monitor != null) { + monitor.triggerHeartbeatTimeout(); + triggered = true; + } + } + + checkState(triggered, + "There is no target " + targetResourceId + " monitored under Heartbeat manager " + managerResourceId); + } + + /** + * Factory instantiates testing monitor instance. + * + * @param <O> Type of the outgoing heartbeat payload + */ + static class TestingHeartbeatMonitorFactory<O> implements HeartbeatMonitor.Factory<O> { + + @Override + public HeartbeatMonitor<O> createHeartbeatMonitor( + ResourceID resourceID, + HeartbeatTarget<O> heartbeatTarget, + ScheduledExecutor mainThreadExecutor, + HeartbeatListener<?, O> heartbeatListener, + long heartbeatTimeoutIntervalMs) { + + return new TestingHeartbeatMonitor<>( + resourceID, + heartbeatTarget, + mainThreadExecutor, + heartbeatListener, + heartbeatTimeoutIntervalMs); + } + } + + /** + * A heartbeat monitor for testing which supports triggering timeout manually. + * + * @param <O> Type of the outgoing heartbeat payload + */ + static class TestingHeartbeatMonitor<O> extends HeartbeatMonitorImpl<O> { + + private volatile boolean timeoutTriggered = false; + + TestingHeartbeatMonitor( + ResourceID resourceID, + HeartbeatTarget<O> heartbeatTarget, + ScheduledExecutor scheduledExecutor, + HeartbeatListener<?, O> heartbeatListener, + long heartbeatTimeoutIntervalMs) { + + super(resourceID, heartbeatTarget, scheduledExecutor, heartbeatListener, heartbeatTimeoutIntervalMs); + } + + @Override + public void reportHeartbeat() { + if (!timeoutTriggered) { + super.reportHeartbeat(); + } + // just swallow the heartbeat report + } + + @Override + void resetHeartbeatTimeout(long heartbeatTimeout) { + synchronized (this) { + if (timeoutTriggered) { + super.resetHeartbeatTimeout(0); + } else { + super.resetHeartbeatTimeout(heartbeatTimeout); + } + } + } + + void triggerHeartbeatTimeout() { + synchronized (this) { + timeoutTriggered = true; + resetHeartbeatTimeout(0); + } + } } }