Repository: flink
Updated Branches:
  refs/heads/master 713c092b2 -> 8765f3241


[FLINK-6076] Refactor HeartbeatManager to extend HeartbeatTarget

Remove start method from HeartbeatManager

This closes #3555.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8765f324
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8765f324
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8765f324

Branch: refs/heads/master
Commit: 8765f324108c6cf20569c271776594b314aa91b7
Parents: 713c092
Author: Till Rohrmann <[email protected]>
Authored: Mon Mar 13 16:52:56 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Mar 17 15:52:02 2017 +0100

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatManager.java     | 11 +---
 .../runtime/heartbeat/HeartbeatManagerImpl.java | 55 +++++++++-----------
 .../heartbeat/HeartbeatManagerSenderImpl.java   | 21 +++++---
 .../runtime/heartbeat/HeartbeatTarget.java      | 12 ++---
 .../runtime/heartbeat/HeartbeatManagerTest.java | 29 +++++------
 5 files changed, 57 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
index 12918ed..a648b1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManager.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
  * @param <I> Type of the incoming payload
  * @param <O> Type of the outgoing payload
  */
-public interface HeartbeatManager<I, O> {
+public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
 
        /**
         * Start monitoring a {@link HeartbeatTarget}. Heartbeat timeouts for 
this target are reported
@@ -52,15 +52,6 @@ public interface HeartbeatManager<I, O> {
        void unmonitorTarget(ResourceID resourceID);
 
        /**
-        * Starts the heartbeat manager with the given {@link 
HeartbeatListener}. The heartbeat listener
-        * is notified about heartbeat timeouts and heartbeat payloads are 
reported and retrieved to
-        * and from it.
-        *
-        * @param heartbeatListener Heartbeat listener associated with the 
heartbeat manager
-        */
-       void start(HeartbeatListener<I, O> heartbeatListener);
-
-       /**
         * Stops the heartbeat manager.
         */
        void stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 042f95b..42b1c85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * @param <O> Type of the outgoing heartbeat payload
  */
 @ThreadSafe
-public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>, 
HeartbeatTarget<I> {
+public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 
        /** Heartbeat timeout interval in milli seconds */
        private final long heartbeatTimeoutIntervalMs;
@@ -52,6 +52,9 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O>, Heart
        /** Resource ID which is used to mark one own's heartbeat signals */
        private final ResourceID ownResourceID;
 
+       /** Heartbeat listener with which the heartbeat manager has been 
associated */
+       private final HeartbeatListener<I, O> heartbeatListener;
+
        /** Executor service used to run heartbeat timeout notifications */
        private final ScheduledExecutorService scheduledExecutorService;
 
@@ -63,28 +66,27 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O>, Heart
        /** Execution context used to run future callbacks */
        private final Executor executor;
 
-       /** Heartbeat listener with which the heartbeat manager has been 
associated */
-       private HeartbeatListener<I, O> heartbeatListener;
-
        /** Running state of the heartbeat manager */
        protected volatile boolean stopped;
 
        public HeartbeatManagerImpl(
-               long heartbeatTimeoutIntervalMs,
-               ResourceID ownResourceID,
-               Executor executor,
-               ScheduledExecutorService scheduledExecutorService,
-               Logger log) {
+                       long heartbeatTimeoutIntervalMs,
+                       ResourceID ownResourceID,
+                       HeartbeatListener<I, O> heartbeatListener,
+                       Executor executor,
+                       ScheduledExecutorService scheduledExecutorService,
+                       Logger log) {
                Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, 
"The heartbeat timeout has to be larger than 0.");
 
                this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
                this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
+               this.heartbeatListener = 
Preconditions.checkNotNull(heartbeatListener, "heartbeatListener");
                this.scheduledExecutorService = 
Preconditions.checkNotNull(scheduledExecutorService);
                this.log = Preconditions.checkNotNull(log);
                this.executor = Preconditions.checkNotNull(executor);
                this.heartbeatTargets = new ConcurrentHashMap<>(16);
 
-               stopped = true;
+               stopped = false;
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -150,15 +152,6 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O>, Heart
        }
 
        @Override
-       public void start(HeartbeatListener<I, O> heartbeatListener) {
-               Preconditions.checkState(stopped, "Cannot start an already 
started heartbeat manager.");
-
-               stopped = false;
-
-               this.heartbeatListener = 
Preconditions.checkNotNull(heartbeatListener);
-       }
-
-       @Override
        public void stop() {
                stopped = true;
 
@@ -174,27 +167,27 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O>, Heart
        
//----------------------------------------------------------------------------------------------
 
        @Override
-       public void sendHeartbeat(ResourceID resourceID, I payload) {
+       public void receiveHeartbeat(ResourceID heartbeatOrigin, I 
heartbeatPayload) {
                if (!stopped) {
-                       log.debug("Received heartbeat from {}.", resourceID);
-                       reportHeartbeat(resourceID);
+                       log.debug("Received heartbeat from {}.", 
heartbeatOrigin);
+                       reportHeartbeat(heartbeatOrigin);
 
-                       if (payload != null) {
-                               heartbeatListener.reportPayload(resourceID, 
payload);
+                       if (heartbeatPayload != null) {
+                               
heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
                        }
                }
        }
 
        @Override
-       public void requestHeartbeat(ResourceID resourceID, I payload) {
+       public void requestHeartbeat(ResourceID requestOrigin, I 
heartbeatPayload) {
                if (!stopped) {
-                       log.debug("Received heartbeat request from {}.", 
resourceID);
+                       log.debug("Received heartbeat request from {}.", 
requestOrigin);
 
-                       final HeartbeatTarget<O> heartbeatTarget = 
reportHeartbeat(resourceID);
+                       final HeartbeatTarget<O> heartbeatTarget = 
reportHeartbeat(requestOrigin);
 
                        if (heartbeatTarget != null) {
-                               if (payload != null) {
-                                       
heartbeatListener.reportPayload(resourceID, payload);
+                               if (heartbeatPayload != null) {
+                                       
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                                }
 
                                Future<O> futurePayload = 
heartbeatListener.retrievePayload();
@@ -203,11 +196,11 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O>, Heart
                                        futurePayload.thenAcceptAsync(new 
AcceptFunction<O>() {
                                                @Override
                                                public void accept(O 
retrievedPayload) {
-                                                       
heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload);
+                                                       
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload);
                                                }
                                        }, executor);
                                } else {
-                                       
heartbeatTarget.sendHeartbeat(ownResourceID, null);
+                                       
heartbeatTarget.receiveHeartbeat(ownResourceID, null);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index 588ba7f..57c8671 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -40,13 +40,20 @@ public class HeartbeatManagerSenderImpl<I, O> extends 
HeartbeatManagerImpl<I, O>
        private final ScheduledFuture<?> triggerFuture;
 
        public HeartbeatManagerSenderImpl(
-               long heartbeatPeriod,
-               long heartbeatTimeout,
-               ResourceID ownResourceID,
-               ExecutorService executorService,
-               ScheduledExecutorService scheduledExecutorService,
-               Logger log) {
-               super(heartbeatTimeout, ownResourceID, executorService, 
scheduledExecutorService, log);
+                       long heartbeatPeriod,
+                       long heartbeatTimeout,
+                       ResourceID ownResourceID,
+                       HeartbeatListener<I, O> heartbeatListener,
+                       ExecutorService executorService,
+                       ScheduledExecutorService scheduledExecutorService,
+                       Logger log) {
+               super(
+                       heartbeatTimeout,
+                       ownResourceID,
+                       heartbeatListener,
+                       executorService,
+                       scheduledExecutorService,
+                       log);
 
                triggerFuture = 
scheduledExecutorService.scheduleAtFixedRate(this, 0L, heartbeatPeriod, 
TimeUnit.MILLISECONDS);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
index ef953de..ff8249e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatTarget.java
@@ -34,17 +34,17 @@ public interface HeartbeatTarget<I> {
         * Sends a heartbeat response to the target. Each heartbeat response 
can carry a payload which
         * contains additional information for the heartbeat target.
         *
-        * @param resourceID Resource ID identifying the machine for which a 
heartbeat shall be reported.
-        * @param payload Payload of the heartbeat response. Null indicates an 
empty payload.
+        * @param heartbeatOrigin Resource ID identifying the machine for which 
a heartbeat shall be reported.
+        * @param heartbeatPayload Payload of the heartbeat. Null indicates an 
empty payload.
         */
-       void sendHeartbeat(ResourceID resourceID, I payload);
+       void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);
 
        /**
         * Requests a heartbeat from the target. Each heartbeat request can 
carry a payload which
         * contains additional information for the heartbeat target.
         *
-        * @param resourceID Resource ID identifying the machine issuing the 
heartbeat request.
-        * @param payload Payload of the heartbeat response. Null indicates an 
empty payload.
+        * @param requestOrigin Resource ID identifying the machine issuing the 
heartbeat request.
+        * @param heartbeatPayload Payload of the heartbeat request. Null 
indicates an empty payload.
         */
-       void requestHeartbeat(ResourceID resourceID, I payload);
+       void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8765f324/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 1c62f17..0a8923d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -72,12 +72,11 @@ public class HeartbeatManagerTest extends TestLogger {
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
                        ownResourceID,
+                       heartbeatListener,
                        new DirectExecutorService(),
                        scheduledExecutorService,
                        LOG);
 
-               heartbeatManager.start(heartbeatListener);
-
                HeartbeatTarget<Object> heartbeatTarget = 
mock(HeartbeatTarget.class);
 
                heartbeatManager.monitorTarget(targetResourceID, 
heartbeatTarget);
@@ -86,9 +85,9 @@ public class HeartbeatManagerTest extends TestLogger {
 
                verify(heartbeatListener, 
times(1)).reportPayload(targetResourceID, expectedObject);
                verify(heartbeatListener, times(1)).retrievePayload();
-               verify(heartbeatTarget, times(1)).sendHeartbeat(ownResourceID, 
expectedObject);
+               verify(heartbeatTarget, 
times(1)).receiveHeartbeat(ownResourceID, expectedObject);
 
-               heartbeatManager.sendHeartbeat(targetResourceID, 
expectedObject);
+               heartbeatManager.receiveHeartbeat(targetResourceID, 
expectedObject);
 
                verify(heartbeatListener, 
times(2)).reportPayload(targetResourceID, expectedObject);
        }
@@ -114,17 +113,16 @@ public class HeartbeatManagerTest extends TestLogger {
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
                        ownResourceID,
+                       heartbeatListener,
                        new DirectExecutorService(),
                        scheduledExecutorService,
                        LOG);
 
-               heartbeatManager.start(heartbeatListener);
-
                HeartbeatTarget<Object> heartbeatTarget = 
mock(HeartbeatTarget.class);
 
                heartbeatManager.monitorTarget(targetResourceID, 
heartbeatTarget);
 
-               heartbeatManager.sendHeartbeat(targetResourceID, 
expectedObject);
+               heartbeatManager.receiveHeartbeat(targetResourceID, 
expectedObject);
 
                verify(scheduledFuture, times(1)).cancel(true);
                verify(scheduledExecutorService, 
times(2)).schedule(any(Runnable.class), eq(heartbeatTimeout), 
eq(TimeUnit.MILLISECONDS));
@@ -155,12 +153,11 @@ public class HeartbeatManagerTest extends TestLogger {
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
                        ownResourceID,
+                       heartbeatListener,
                        new DirectExecutorService(),
                        new ScheduledThreadPoolExecutor(1),
                        LOG);
 
-               heartbeatManager.start(heartbeatListener);
-
                HeartbeatTarget<Object> heartbeatTarget = 
mock(HeartbeatTarget.class);
 
                Future<ResourceID> timeoutFuture = 
heartbeatListener.getTimeoutFuture();
@@ -168,7 +165,7 @@ public class HeartbeatManagerTest extends TestLogger {
                heartbeatManager.monitorTarget(targetResourceID, 
heartbeatTarget);
 
                for (int i = 0; i < numHeartbeats; i++) {
-                       heartbeatManager.sendHeartbeat(targetResourceID, 
expectedObject);
+                       heartbeatManager.receiveHeartbeat(targetResourceID, 
expectedObject);
                        Thread.sleep(heartbeatInterval);
                }
 
@@ -206,6 +203,7 @@ public class HeartbeatManagerTest extends TestLogger {
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
                        resourceID,
+                       heartbeatListener,
                        new DirectExecutorService(),
                        new ScheduledThreadPoolExecutor(1),
                        LOG);
@@ -214,13 +212,11 @@ public class HeartbeatManagerTest extends TestLogger {
                        heartbeatPeriod,
                        heartbeatTimeout,
                        resourceID2,
+                       heartbeatListener2,
                        new DirectExecutorService(),
                        new ScheduledThreadPoolExecutor(1),
                        LOG);;
 
-               heartbeatManager.start(heartbeatListener);
-               heartbeatManager2.start(heartbeatListener2);
-
                heartbeatManager.monitorTarget(resourceID2, heartbeatManager2);
                heartbeatManager2.monitorTarget(resourceID, heartbeatManager);
 
@@ -251,17 +247,16 @@ public class HeartbeatManagerTest extends TestLogger {
                ResourceID targetID = new ResourceID("target");
                Object object = new Object();
 
+               TestingHeartbeatListener heartbeatListener = new 
TestingHeartbeatListener(object);
+
                HeartbeatManager<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
                        resourceID,
+                       heartbeatListener,
                        new DirectExecutorService(),
                        new ScheduledThreadPoolExecutor(1),
                        LOG);
 
-               TestingHeartbeatListener heartbeatListener = new 
TestingHeartbeatListener(object);
-
-               heartbeatManager.start(heartbeatListener);
-
                heartbeatManager.monitorTarget(targetID, 
mock(HeartbeatTarget.class));
 
                heartbeatManager.unmonitorTarget(targetID);

Reply via email to