Repository: flink Updated Branches: refs/heads/master 4536e9cbe -> c90a757b2
[FLINK-8942][runtime] Pass heartbeat target ResourceID received payload field now volatile Add HeartbeatMonitor#getHeartbeatTargetId This closes #5699. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9fbbc3a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9fbbc3a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9fbbc3a Branch: refs/heads/master Commit: f9fbbc3a137276cab4b8abf272199f1cd4633d29 Parents: 4536e9c Author: zentol <[email protected]> Authored: Wed Mar 14 14:21:27 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 20 14:53:43 2018 +0100 ---------------------------------------------------------------------- .../runtime/heartbeat/HeartbeatListener.java | 3 +- .../runtime/heartbeat/HeartbeatManagerImpl.java | 8 +- .../heartbeat/HeartbeatManagerSenderImpl.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 4 +- .../resourcemanager/ResourceManager.java | 4 +- .../runtime/taskexecutor/TaskExecutor.java | 4 +- .../runtime/heartbeat/HeartbeatManagerTest.java | 167 ++++++++++++++++++- 7 files changed, 177 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java index 734eb4c..01a4754 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java @@ -57,7 +57,8 @@ public interface HeartbeatListener<I, O> { * Retrieves the payload value for the next heartbeat message. Since the operation can happen * asynchronously, the result is returned wrapped in a future. * + * @param resourceID Resource ID identifying the receiver of the payload * @return Future containing the next payload for heartbeats */ - CompletableFuture<O> retrievePayload(); + CompletableFuture<O> retrievePayload(ResourceID resourceID); } http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java index 09c4b46..42268fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -106,7 +106,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { return heartbeatListener; } - Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() { + Collection<HeartbeatMonitor<O>> getHeartbeatTargets() { return heartbeatTargets.values(); } @@ -202,7 +202,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); } - CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(); + CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(requestOrigin); if (futurePayload != null) { CompletableFuture<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync( @@ -289,6 +289,10 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { return heartbeatTarget; } + ResourceID getHeartbeatTargetId() { + return resourceID; + } + public long getLastHeartbeat() { return lastHeartbeat; } http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java index eb82343..e3b939c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -63,7 +63,7 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> if (!stopped) { log.debug("Trigger heartbeat request."); for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) { - CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(); + CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); if (futurePayload != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index ced8c7c..f0b29bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1527,7 +1527,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } @Override - public CompletableFuture<Void> retrievePayload() { + public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } } @@ -1551,7 +1551,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } @Override - public CompletableFuture<Void> retrievePayload() { + public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 77e4362..0ae4ab6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -1076,7 +1076,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } @Override - public CompletableFuture<Void> retrievePayload() { + public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } } @@ -1109,7 +1109,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } @Override - public CompletableFuture<Void> retrievePayload() { + public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index fc69984..7409175 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1515,7 +1515,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } @Override - public CompletableFuture<Void> retrievePayload() { + public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } } @@ -1544,7 +1544,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } @Override - public CompletableFuture<SlotReport> retrievePayload() { + public CompletableFuture<SlotReport> retrievePayload(ResourceID resourceID) { return callAsync( () -> taskSlotTable.createSlotReport(getResourceID()), taskManagerConfiguration.getTimeout()); http://git-wip-us.apache.org/repos/asf/flink/blob/f9fbbc3a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java index 390a131..77d12d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.heartbeat; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -75,7 +76,7 @@ public class HeartbeatManagerTest extends TestLogger { Object expectedObject = new Object(); - when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject)); + when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject)); HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -93,7 +94,7 @@ public class HeartbeatManagerTest extends TestLogger { heartbeatManager.requestHeartbeat(targetResourceID, expectedObject); verify(heartbeatListener, times(1)).reportPayload(targetResourceID, expectedObject); - verify(heartbeatListener, times(1)).retrievePayload(); + verify(heartbeatListener, times(1)).retrievePayload(any(ResourceID.class)); verify(heartbeatTarget, times(1)).receiveHeartbeat(ownResourceID, expectedObject); heartbeatManager.receiveHeartbeat(targetResourceID, expectedObject); @@ -118,7 +119,7 @@ public class HeartbeatManagerTest extends TestLogger { Object expectedObject = new Object(); - when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(expectedObject)); + when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(expectedObject)); HeartbeatManagerImpl<Object, Object> heartbeatManager = new HeartbeatManagerImpl<>( heartbeatTimeout, @@ -207,7 +208,7 @@ public class HeartbeatManagerTest extends TestLogger { @SuppressWarnings("unchecked") HeartbeatListener<Object, Object> heartbeatListener = mock(HeartbeatListener.class); - when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(object)); + when(heartbeatListener.retrievePayload(any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(object)); TestingHeartbeatListener heartbeatListener2 = new TestingHeartbeatListener(object2); @@ -347,6 +348,162 @@ public class HeartbeatManagerTest extends TestLogger { } } + /** + * Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the + * {@link HeartbeatManagerImpl}. + */ + @Test + public void testHeartbeatManagerTargetPayload() { + final long heartbeatTimeout = 100L; + + final ResourceID someTargetId = ResourceID.generate(); + final ResourceID specialTargetId = ResourceID.generate(); + final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver(); + final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver(); + + final int defaultResponse = 0; + final int specialResponse = 1; + + HeartbeatManager<?, Integer> heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + ResourceID.generate(), + new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse), + Executors.directExecutor(), + mock(ScheduledExecutor.class), + LOG); + + try { + heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget); + heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget); + + heartbeatManager.requestHeartbeat(someTargetId, null); + assertEquals(defaultResponse, someHeartbeatTarget.getLastReceivedHeartbeatPayload()); + + heartbeatManager.requestHeartbeat(specialTargetId, null); + assertEquals(specialResponse, specialHeartbeatTarget.getLastReceivedHeartbeatPayload()); + } finally { + heartbeatManager.stop(); + } + } + + /** + * Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the + * {@link HeartbeatManagerSenderImpl}. + */ + @Test + public void testHeartbeatManagerSenderTargetPayload() throws Exception { + final long heartbeatTimeout = 100L; + final long heartbeatPeriod = 2000L; + + final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + + final ResourceID someTargetId = ResourceID.generate(); + final ResourceID specialTargetId = ResourceID.generate(); + + final OneShotLatch someTargetReceivedLatch = new OneShotLatch(); + final OneShotLatch specialTargetReceivedLatch = new OneShotLatch(); + + final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver(someTargetReceivedLatch); + final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver(specialTargetReceivedLatch); + + final int defaultResponse = 0; + final int specialResponse = 1; + + HeartbeatManager<?, Integer> heartbeatManager = new HeartbeatManagerSenderImpl<>( + heartbeatPeriod, + heartbeatTimeout, + ResourceID.generate(), + new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse), + Executors.directExecutor(), + new ScheduledExecutorServiceAdapter(scheduledThreadPoolExecutor), + LOG); + + try { + heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget); + heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget); + + someTargetReceivedLatch.await(5, TimeUnit.SECONDS); + specialTargetReceivedLatch.await(5, TimeUnit.SECONDS); + + assertEquals(defaultResponse, someHeartbeatTarget.getLastRequestedHeartbeatPayload()); + assertEquals(specialResponse, specialHeartbeatTarget.getLastRequestedHeartbeatPayload()); + } finally { + heartbeatManager.stop(); + scheduledThreadPoolExecutor.shutdown(); + } + } + + /** + * Test {@link HeartbeatTarget} that exposes the last received payload. + */ + private static class TargetDependentHeartbeatReceiver implements HeartbeatTarget<Integer> { + + private volatile int lastReceivedHeartbeatPayload = -1; + private volatile int lastRequestedHeartbeatPayload = -1; + + private final OneShotLatch latch; + + public TargetDependentHeartbeatReceiver() { + this(new OneShotLatch()); + } + + public TargetDependentHeartbeatReceiver(OneShotLatch latch) { + this.latch = latch; + } + + @Override + public void receiveHeartbeat(ResourceID heartbeatOrigin, Integer heartbeatPayload) { + this.lastReceivedHeartbeatPayload = heartbeatPayload; + latch.trigger(); + } + + @Override + public void requestHeartbeat(ResourceID requestOrigin, Integer heartbeatPayload) { + this.lastRequestedHeartbeatPayload = heartbeatPayload; + latch.trigger(); + } + + public int getLastReceivedHeartbeatPayload() { + return lastReceivedHeartbeatPayload; + } + + public int getLastRequestedHeartbeatPayload() { + return lastRequestedHeartbeatPayload; + } + } + + /** + * Test {@link HeartbeatListener} that returns different payloads based on the target {@link ResourceID}. + */ + private static class TargetDependentHeartbeatSender implements HeartbeatListener<Object, Integer> { + private final ResourceID specialId; + private final int specialResponse; + private final int defaultResponse; + + TargetDependentHeartbeatSender(ResourceID specialId, int specialResponse, int defaultResponse) { + this.specialId = specialId; + this.specialResponse = specialResponse; + this.defaultResponse = defaultResponse; + } + + @Override + public void notifyHeartbeatTimeout(ResourceID resourceID) { + } + + @Override + public void reportPayload(ResourceID resourceID, Object payload) { + } + + @Override + public CompletableFuture<Integer> retrievePayload(ResourceID resourceID) { + if (resourceID.equals(specialId)) { + return CompletableFuture.completedFuture(specialResponse); + } else { + return CompletableFuture.completedFuture(defaultResponse); + } + } + } + static class TestingHeartbeatListener implements HeartbeatListener<Object, Object> { private final CompletableFuture<ResourceID> future = new CompletableFuture<>(); @@ -378,7 +535,7 @@ public class HeartbeatManagerTest extends TestLogger { } @Override - public CompletableFuture<Object> retrievePayload() { + public CompletableFuture<Object> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(payload); } }
