Repository: flink Updated Branches: refs/heads/master 713c092b2 -> 8765f3241
[FLINK-6076] Refactor HeartbeatManager to extend HeartbeatTarget Remove start method from HeartbeatManager This closes #3555. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8765f324 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8765f324 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8765f324 Branch: refs/heads/master Commit: 8765f324108c6cf20569c271776594b314aa91b7 Parents: 713c092 Author: Till Rohrmann <[email protected]> Authored: Mon Mar 13 16:52:56 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Mar 17 15:52:02 2017 +0100 ---------------------------------------------------------------------- .../runtime/heartbeat/HeartbeatManager.java | 11 +--- .../runtime/heartbeat/HeartbeatManagerImpl.java | 55 +++++++++----------- .../heartbeat/HeartbeatManagerSenderImpl.java | 21 +++++--- .../runtime/heartbeat/HeartbeatTarget.java | 12 ++--- .../runtime/heartbeat/HeartbeatManagerTest.java | 29 +++++------ 5 files changed, 57 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java index 12918ed..a648b1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; * @param <I> Type of the incoming payload * @param <O> Type of the outgoing payload */ -public interface HeartbeatManager<I, O> { +public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> { /** * Start monitoring a {@link HeartbeatTarget}. Heartbeat timeouts for this target are reported @@ -52,15 +52,6 @@ public interface HeartbeatManager<I, O> { void unmonitorTarget(ResourceID resourceID); /** - * Starts the heartbeat manager with the given {@link HeartbeatListener}. The heartbeat listener - * is notified about heartbeat timeouts and heartbeat payloads are reported and retrieved to - * and from it. - * - * @param heartbeatListener Heartbeat listener associated with the heartbeat manager - */ - void start(HeartbeatListener<I, O> heartbeatListener); - - /** * Stops the heartbeat manager. */ void stop(); http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java index 042f95b..42b1c85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; * @param <O> Type of the outgoing heartbeat payload */ @ThreadSafe -public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, HeartbeatTarget<I> { +public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { /** Heartbeat timeout interval in milli seconds */ private final long heartbeatTimeoutIntervalMs; @@ -52,6 +52,9 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, Heart /** Resource ID which is used to mark one own's heartbeat signals */ private final ResourceID ownResourceID; + /** Heartbeat listener with which the heartbeat manager has been associated */ + private final HeartbeatListener<I, O> heartbeatListener; + /** Executor service used to run heartbeat timeout notifications */ private final ScheduledExecutorService scheduledExecutorService; @@ -63,28 +66,27 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, Heart /** Execution context used to run future callbacks */ private final Executor executor; - /** Heartbeat listener with which the heartbeat manager has been associated */ - private HeartbeatListener<I, O> heartbeatListener; - /** Running state of the heartbeat manager */ protected volatile boolean stopped; public HeartbeatManagerImpl( - long heartbeatTimeoutIntervalMs, - ResourceID ownResourceID, - Executor executor, - ScheduledExecutorService scheduledExecutorService, - Logger log) { + long heartbeatTimeoutIntervalMs, + ResourceID ownResourceID, + HeartbeatListener<I, O> heartbeatListener, + Executor executor, + ScheduledExecutorService scheduledExecutorService, + Logger log) { Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0."); this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs; this.ownResourceID = Preconditions.checkNotNull(ownResourceID); + this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener, "heartbeatListener"); this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); this.log = Preconditions.checkNotNull(log); this.executor = Preconditions.checkNotNull(executor); this.heartbeatTargets = new ConcurrentHashMap<>(16); - stopped = true; + stopped = false; } //---------------------------------------------------------------------------------------------- @@ -150,15 +152,6 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, Heart } @Override - public void start(HeartbeatListener<I, O> heartbeatListener) { - Preconditions.checkState(stopped, "Cannot start an already started heartbeat manager."); - - stopped = false; - - this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener); - } - - @Override public void stop() { stopped = true; @@ -174,27 +167,27 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, Heart //---------------------------------------------------------------------------------------------- @Override - public void sendHeartbeat(ResourceID resourceID, I payload) { + public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) { if (!stopped) { - log.debug("Received heartbeat from {}.", resourceID); - reportHeartbeat(resourceID); + log.debug("Received heartbeat from {}.", heartbeatOrigin); + reportHeartbeat(heartbeatOrigin); - if (payload != null) { - heartbeatListener.reportPayload(resourceID, payload); + if (heartbeatPayload != null) { + heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload); } } } @Override - public void requestHeartbeat(ResourceID resourceID, I payload) { + public void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload) { if (!stopped) { - log.debug("Received heartbeat request from {}.", resourceID); + log.debug("Received heartbeat request from {}.", requestOrigin); - final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(resourceID); + final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin); if (heartbeatTarget != null) { - if (payload != null) { - heartbeatListener.reportPayload(resourceID, payload); + if (heartbeatPayload != null) { + heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); } Future<O> futurePayload = heartbeatListener.retrievePayload(); @@ -203,11 +196,11 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, Heart futurePayload.thenAcceptAsync(new AcceptFunction<O>() { @Override public void accept(O retrievedPayload) { - heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload); + heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload); } }, executor); } else { - heartbeatTarget.sendHeartbeat(ownResourceID, null); + heartbeatTarget.receiveHeartbeat(ownResourceID, null); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java index 588ba7f..57c8671 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -40,13 +40,20 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> private final ScheduledFuture<?> triggerFuture; public HeartbeatManagerSenderImpl( - long heartbeatPeriod, - long heartbeatTimeout, - ResourceID ownResourceID, - ExecutorService executorService, - ScheduledExecutorService scheduledExecutorService, - Logger log) { - super(heartbeatTimeout, ownResourceID, executorService, scheduledExecutorService, log); + long heartbeatPeriod, + long heartbeatTimeout, + ResourceID ownResourceID, + HeartbeatListener<I, O> heartbeatListener, + ExecutorService executorService, + ScheduledExecutorService scheduledExecutorService, + Logger log) { + super( + heartbeatTimeout, + ownResourceID, + heartbeatListener, + executorService, + scheduledExecutorService, + log); triggerFuture = scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS); } http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java index ef953de..ff8249e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java @@ -34,17 +34,17 @@ public interface HeartbeatTarget<I> { * Sends a heartbeat response to the target. Each heartbeat response can carry a payload which * contains additional information for the heartbeat target. * - * @param resourceID Resource ID identifying the machine for which a heartbeat shall be reported. - * @param payload Payload of the heartbeat response. Null indicates an empty payload. + * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported. + * @param heartbeatPayload Payload of the heartbeat. Null indicates an empty payload. */ - void sendHeartbeat(ResourceID resourceID, I payload); + void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload); /** * Requests a heartbeat from the target. Each heartbeat request can carry a payload which * contains additional information for the heartbeat target. * - * @param resourceID Resource ID identifying the machine issuing the heartbeat request. - * @param payload Payload of the heartbeat response. Null indicates an empty payload. + * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request. + * @param heartbeatPayload Payload of the heartbeat request. Null indicates an empty payload. */ - void requestHeartbeat(ResourceID resourceID, I payload); + void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload); } http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java index 1c62f17..0a8923d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java @@ -72,12 +72,11 @@ public class HeartbeatManagerTest extends TestLogger { HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, ownResourceID, + heartbeatListener, new DirectExecutorService(), scheduledExecutorService, LOG); - heartbeatManager.start(heartbeatListener); - HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class); heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); @@ -86,9 +85,9 @@ public class HeartbeatManagerTest extends TestLogger { verify(heartbeatListener, times(1)).reportPayload(targetResourceID, expectedObject); verify(heartbeatListener, times(1)).retrievePayload(); - verify(heartbeatTarget, times(1)).sendHeartbeat(ownResourceID, expectedObject); + verify(heartbeatTarget, times(1)).receiveHeartbeat(ownResourceID, expectedObject); - heartbeatManager.sendHeartbeat(targetResourceID, expectedObject); + heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject); verify(heartbeatListener, times(2)).reportPayload(targetResourceID, expectedObject); } @@ -114,17 +113,16 @@ public class HeartbeatManagerTest extends TestLogger { HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, ownResourceID, + heartbeatListener, new DirectExecutorService(), scheduledExecutorService, LOG); - heartbeatManager.start(heartbeatListener); - HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class); heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); - heartbeatManager.sendHeartbeat(targetResourceID, expectedObject); + heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject); verify(scheduledFuture, times(1)).cancel(true); verify(scheduledExecutorService, times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); @@ -155,12 +153,11 @@ public class HeartbeatManagerTest extends TestLogger { HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, ownResourceID, + heartbeatListener, new DirectExecutorService(), new ScheduledThreadPoolExecutor(1), LOG); - heartbeatManager.start(heartbeatListener); - HeartbeatTarget<Object> heartbeatTarget = mock(HeartbeatTarget.class); Future<ResourceID> timeoutFuture = heartbeatListener.getTimeoutFuture(); @@ -168,7 +165,7 @@ public class HeartbeatManagerTest extends TestLogger { heartbeatManager.monitorTarget(targetResourceID, heartbeatTarget); for (int i = 0; i < numHeartbeats; i++) { - heartbeatManager.sendHeartbeat(targetResourceID, expectedObject); + heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject); Thread.sleep(heartbeatInterval); } @@ -206,6 +203,7 @@ public class HeartbeatManagerTest extends TestLogger { HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, resourceID, + heartbeatListener, new DirectExecutorService(), new ScheduledThreadPoolExecutor(1), LOG); @@ -214,13 +212,11 @@ public class HeartbeatManagerTest extends TestLogger { heartbeatPeriod, heartbeatTimeout, resourceID2, + heartbeatListener2, new DirectExecutorService(), new ScheduledThreadPoolExecutor(1), LOG);; - heartbeatManager.start(heartbeatListener); - heartbeatManager2.start(heartbeatListener2); - heartbeatManager.monitorTarget(resourceID2, heartbeatManager2); heartbeatManager2.monitorTarget(resourceID, heartbeatManager); @@ -251,17 +247,16 @@ public class HeartbeatManagerTest extends TestLogger { ResourceID targetID = new ResourceID("target"); Object object = new Object(); + TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(object); + HeartbeatManager<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, resourceID, + heartbeatListener, new DirectExecutorService(), new ScheduledThreadPoolExecutor(1), LOG); - TestingHeartbeatListener heartbeatListener = new TestingHeartbeatListener(object); - - heartbeatManager.start(heartbeatListener); - heartbeatManager.monitorTarget(targetID, mock(HeartbeatTarget.class)); heartbeatManager.unmonitorTarget(targetID);
