Repository: flink
Updated Branches:
  refs/heads/master 4536e9cbe -> c90a757b2


[FLINK-8942][runtime] Pass heartbeat target ResourceID

received payload field now volatile

Add HeartbeatMonitor#getHeartbeatTargetId

This closes #5699.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9fbbc3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9fbbc3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9fbbc3a

Branch: refs/heads/master
Commit: f9fbbc3a137276cab4b8abf272199f1cd4633d29
Parents: 4536e9c
Author: zentol <[email protected]>
Authored: Wed Mar 14 14:21:27 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 20 14:53:43 2018 +0100

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatListener.java    |   3 +-
 .../runtime/heartbeat/HeartbeatManagerImpl.java |   8 +-
 .../heartbeat/HeartbeatManagerSenderImpl.java   |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   4 +-
 .../resourcemanager/ResourceManager.java        |   4 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   4 +-
 .../runtime/heartbeat/HeartbeatManagerTest.java | 167 ++++++++++++++++++-
 7 files changed, 177 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
index 734eb4c..01a4754 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
@@ -57,7 +57,8 @@ public interface HeartbeatListener<I, O> {
         * Retrieves the payload value for the next heartbeat message. Since 
the operation can happen
         * asynchronously, the result is returned wrapped in a future.
         *
+        * @param resourceID Resource ID identifying the receiver of the payload
         * @return Future containing the next payload for heartbeats
         */
-       CompletableFuture<O> retrievePayload();
+       CompletableFuture<O> retrievePayload(ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 09c4b46..42268fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -106,7 +106,7 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O> {
                return heartbeatListener;
        }
 
-       Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> 
getHeartbeatTargets() {
+       Collection<HeartbeatMonitor<O>> getHeartbeatTargets() {
                return heartbeatTargets.values();
        }
 
@@ -202,7 +202,7 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O> {
                                        
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                                }
 
-                               CompletableFuture<O> futurePayload = 
heartbeatListener.retrievePayload();
+                               CompletableFuture<O> futurePayload = 
heartbeatListener.retrievePayload(requestOrigin);
 
                                if (futurePayload != null) {
                                        CompletableFuture<Void> 
sendHeartbeatFuture = futurePayload.thenAcceptAsync(
@@ -289,6 +289,10 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O> {
                        return heartbeatTarget;
                }
 
+               ResourceID getHeartbeatTargetId() {
+                       return resourceID;
+               }
+
                public long getLastHeartbeat() {
                        return lastHeartbeat;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index eb82343..e3b939c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -63,7 +63,7 @@ public class HeartbeatManagerSenderImpl<I, O> extends 
HeartbeatManagerImpl<I, O>
                if (!stopped) {
                        log.debug("Trigger heartbeat request.");
                        for (HeartbeatMonitor<O> heartbeatMonitor : 
getHeartbeatTargets()) {
-                               CompletableFuture<O> futurePayload = 
getHeartbeatListener().retrievePayload();
+                               CompletableFuture<O> futurePayload = 
getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
                                final HeartbeatTarget<O> heartbeatTarget = 
heartbeatMonitor.getHeartbeatTarget();
 
                                if (futurePayload != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index ced8c7c..f0b29bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1527,7 +1527,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
 
                @Override
-               public CompletableFuture<Void> retrievePayload() {
+               public CompletableFuture<Void> retrievePayload(ResourceID 
resourceID) {
                        return CompletableFuture.completedFuture(null);
                }
        }
@@ -1551,7 +1551,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
 
                @Override
-               public CompletableFuture<Void> retrievePayload() {
+               public CompletableFuture<Void> retrievePayload(ResourceID 
resourceID) {
                        return CompletableFuture.completedFuture(null);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 77e4362..0ae4ab6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1076,7 +1076,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                }
 
                @Override
-               public CompletableFuture<Void> retrievePayload() {
+               public CompletableFuture<Void> retrievePayload(ResourceID 
resourceID) {
                        return CompletableFuture.completedFuture(null);
                }
        }
@@ -1109,7 +1109,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                }
 
                @Override
-               public CompletableFuture<Void> retrievePayload() {
+               public CompletableFuture<Void> retrievePayload(ResourceID 
resourceID) {
                        return CompletableFuture.completedFuture(null);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fc69984..7409175 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1515,7 +1515,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
 
                @Override
-               public CompletableFuture<Void> retrievePayload() {
+               public CompletableFuture<Void> retrievePayload(ResourceID 
resourceID) {
                        return CompletableFuture.completedFuture(null);
                }
        }
@@ -1544,7 +1544,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
 
                @Override
-               public CompletableFuture<SlotReport> retrievePayload() {
+               public CompletableFuture<SlotReport> retrievePayload(ResourceID 
resourceID) {
                        return callAsync(
                                        () -> 
taskSlotTable.createSlotReport(getResourceID()),
                                        taskManagerConfiguration.getTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index 390a131..77d12d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.heartbeat;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -75,7 +76,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
                Object expectedObject = new Object();
 
-               
when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
+               
when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject));
 
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
@@ -93,7 +94,7 @@ public class HeartbeatManagerTest extends TestLogger {
                heartbeatManager.requestHeartbeat(targetResourceID, 
expectedObject);
 
                verify(heartbeatListener, 
times(1)).reportPayload(targetResourceID, expectedObject);
-               verify(heartbeatListener, times(1)).retrievePayload();
+               verify(heartbeatListener, 
times(1)).retrievePayload(any(ResourceID.class));
                verify(heartbeatTarget, 
times(1)).receiveHeartbeat(ownResourceID, expectedObject);
 
                heartbeatManager.receiveHeartbeat(targetResourceID, 
expectedObject);
@@ -118,7 +119,7 @@ public class HeartbeatManagerTest extends TestLogger {
 
                Object expectedObject = new Object();
 
-               
when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject));
+               
when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject));
 
                HeartbeatManagerImpl<Object, Object> heartbeatManager = new 
HeartbeatManagerImpl<>(
                        heartbeatTimeout,
@@ -207,7 +208,7 @@ public class HeartbeatManagerTest extends TestLogger {
                @SuppressWarnings("unchecked")
                HeartbeatListener<Object, Object> heartbeatListener = 
mock(HeartbeatListener.class);
 
-               
when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object));
+               
when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(object));
 
                TestingHeartbeatListener heartbeatListener2 = new 
TestingHeartbeatListener(object2);
 
@@ -347,6 +348,162 @@ public class HeartbeatManagerTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that the heartbeat target {@link ResourceID} is properly 
passed to the {@link HeartbeatListener} by the
+        * {@link HeartbeatManagerImpl}.
+        */
+       @Test
+       public void testHeartbeatManagerTargetPayload() {
+               final long heartbeatTimeout = 100L;
+
+               final ResourceID someTargetId = ResourceID.generate();
+               final ResourceID specialTargetId = ResourceID.generate();
+               final TargetDependentHeartbeatReceiver someHeartbeatTarget = 
new TargetDependentHeartbeatReceiver();
+               final TargetDependentHeartbeatReceiver specialHeartbeatTarget = 
new TargetDependentHeartbeatReceiver();
+
+               final int defaultResponse = 0;
+               final int specialResponse = 1;
+
+               HeartbeatManager<?, Integer> heartbeatManager = new 
HeartbeatManagerImpl<>(
+                       heartbeatTimeout,
+                       ResourceID.generate(),
+                       new TargetDependentHeartbeatSender(specialTargetId, 
specialResponse, defaultResponse),
+                       Executors.directExecutor(),
+                       mock(ScheduledExecutor.class),
+                       LOG);
+
+               try {
+                       heartbeatManager.monitorTarget(someTargetId, 
someHeartbeatTarget);
+                       heartbeatManager.monitorTarget(specialTargetId, 
specialHeartbeatTarget);
+
+                       heartbeatManager.requestHeartbeat(someTargetId, null);
+                       assertEquals(defaultResponse, 
someHeartbeatTarget.getLastReceivedHeartbeatPayload());
+
+                       heartbeatManager.requestHeartbeat(specialTargetId, 
null);
+                       assertEquals(specialResponse, 
specialHeartbeatTarget.getLastReceivedHeartbeatPayload());
+               } finally {
+                       heartbeatManager.stop();
+               }
+       }
+
+       /**
+        * Tests that the heartbeat target {@link ResourceID} is properly 
passed to the {@link HeartbeatListener} by the
+        * {@link HeartbeatManagerSenderImpl}.
+        */
+       @Test
+       public void testHeartbeatManagerSenderTargetPayload() throws Exception {
+               final long heartbeatTimeout = 100L;
+               final long heartbeatPeriod = 2000L;
+
+               final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = 
new ScheduledThreadPoolExecutor(1);
+
+               final ResourceID someTargetId = ResourceID.generate();
+               final ResourceID specialTargetId = ResourceID.generate();
+
+               final OneShotLatch someTargetReceivedLatch = new OneShotLatch();
+               final OneShotLatch specialTargetReceivedLatch = new 
OneShotLatch();
+
+               final TargetDependentHeartbeatReceiver someHeartbeatTarget = 
new TargetDependentHeartbeatReceiver(someTargetReceivedLatch);
+               final TargetDependentHeartbeatReceiver specialHeartbeatTarget = 
new TargetDependentHeartbeatReceiver(specialTargetReceivedLatch);
+
+               final int defaultResponse = 0;
+               final int specialResponse = 1;
+
+               HeartbeatManager<?, Integer> heartbeatManager = new 
HeartbeatManagerSenderImpl<>(
+                       heartbeatPeriod,
+                       heartbeatTimeout,
+                       ResourceID.generate(),
+                       new TargetDependentHeartbeatSender(specialTargetId, 
specialResponse, defaultResponse),
+                       Executors.directExecutor(),
+                       new 
ScheduledExecutorServiceAdapter(scheduledThreadPoolExecutor),
+                       LOG);
+
+               try {
+                       heartbeatManager.monitorTarget(someTargetId, 
someHeartbeatTarget);
+                       heartbeatManager.monitorTarget(specialTargetId, 
specialHeartbeatTarget);
+
+                       someTargetReceivedLatch.await(5, TimeUnit.SECONDS);
+                       specialTargetReceivedLatch.await(5, TimeUnit.SECONDS);
+
+                       assertEquals(defaultResponse, 
someHeartbeatTarget.getLastRequestedHeartbeatPayload());
+                       assertEquals(specialResponse, 
specialHeartbeatTarget.getLastRequestedHeartbeatPayload());
+               } finally {
+                       heartbeatManager.stop();
+                       scheduledThreadPoolExecutor.shutdown();
+               }
+       }
+
+       /**
+        * Test {@link HeartbeatTarget} that exposes the last received payload.
+        */
+       private static class TargetDependentHeartbeatReceiver implements 
HeartbeatTarget<Integer> {
+
+               private volatile int lastReceivedHeartbeatPayload = -1;
+               private volatile int lastRequestedHeartbeatPayload = -1;
+
+               private final OneShotLatch latch;
+
+               public TargetDependentHeartbeatReceiver() {
+                       this(new OneShotLatch());
+               }
+
+               public TargetDependentHeartbeatReceiver(OneShotLatch latch) {
+                       this.latch = latch;
+               }
+
+               @Override
+               public void receiveHeartbeat(ResourceID heartbeatOrigin, 
Integer heartbeatPayload) {
+                       this.lastReceivedHeartbeatPayload = heartbeatPayload;
+                       latch.trigger();
+               }
+
+               @Override
+               public void requestHeartbeat(ResourceID requestOrigin, Integer 
heartbeatPayload) {
+                       this.lastRequestedHeartbeatPayload = heartbeatPayload;
+                       latch.trigger();
+               }
+
+               public int getLastReceivedHeartbeatPayload() {
+                       return lastReceivedHeartbeatPayload;
+               }
+
+               public int getLastRequestedHeartbeatPayload() {
+                       return lastRequestedHeartbeatPayload;
+               }
+       }
+
+       /**
+        * Test {@link HeartbeatListener} that returns different payloads based 
on the target {@link ResourceID}.
+        */
+       private static class TargetDependentHeartbeatSender implements 
HeartbeatListener<Object, Integer>  {
+               private final ResourceID specialId;
+               private final int specialResponse;
+               private final int defaultResponse;
+
+               TargetDependentHeartbeatSender(ResourceID specialId, int 
specialResponse, int defaultResponse) {
+                       this.specialId = specialId;
+                       this.specialResponse = specialResponse;
+                       this.defaultResponse = defaultResponse;
+               }
+
+               @Override
+               public void notifyHeartbeatTimeout(ResourceID resourceID) {
+               }
+
+               @Override
+               public void reportPayload(ResourceID resourceID, Object 
payload) {
+               }
+
+               @Override
+               public CompletableFuture<Integer> retrievePayload(ResourceID 
resourceID) {
+                       if (resourceID.equals(specialId)) {
+                               return 
CompletableFuture.completedFuture(specialResponse);
+                       } else {
+                               return 
CompletableFuture.completedFuture(defaultResponse);
+                       }
+               }
+       }
+
        static class TestingHeartbeatListener implements 
HeartbeatListener<Object, Object> {
 
                private final CompletableFuture<ResourceID> future = new 
CompletableFuture<>();
@@ -378,7 +535,7 @@ public class HeartbeatManagerTest extends TestLogger {
                }
 
                @Override
-               public CompletableFuture<Object> retrievePayload() {
+               public CompletableFuture<Object> retrievePayload(ResourceID 
resourceID) {
                        return CompletableFuture.completedFuture(payload);
                }
        }

Reply via email to