Repository: flink Updated Branches: refs/heads/master a30355356 -> ffb056072
[FLINK-6050] [robustness] Register exception handler on thenAccept futures When applying an AcceptFunction on a Future x, then we should register the exception handler on the returned thenAccept future instead of on x. This has the advantage that we also catch exceptions which are thrown inside of the AcceptFunction and not only those which originate from x. The PR adapts the code respectively. This closes #3537. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffb05607 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffb05607 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffb05607 Branch: refs/heads/master Commit: ffb0560728df53ac61d77a029562dbd5c0da8d75 Parents: a303553 Author: Till Rohrmann <[email protected]> Authored: Tue Mar 14 17:23:30 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 21 10:55:24 2017 +0100 ---------------------------------------------------------------------- .../runtime/heartbeat/HeartbeatManagerImpl.java | 14 ++++- .../heartbeat/HeartbeatManagerSenderImpl.java | 12 +++- .../apache/flink/runtime/instance/SlotPool.java | 8 ++- .../registration/RegisteredRpcConnection.java | 4 +- .../registration/RetryingRegistration.java | 8 +-- .../resourcemanager/ResourceManager.java | 4 +- .../runtime/taskexecutor/TaskExecutor.java | 10 +++- .../flink/runtime/jobmanager/JobManager.scala | 12 +++- .../runtime/concurrent/FlinkFutureTest.java | 60 ++++++++++++++++++++ .../flink/runtime/instance/SlotPoolTest.java | 3 +- .../taskexecutor/TaskExecutorITCase.java | 5 ++ .../runtime/taskexecutor/TaskExecutorTest.java | 16 ++++-- 12 files changed, 131 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java index 9860b4d..28ab086 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.util.Preconditions; @@ -179,7 +180,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { } @Override - public void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload) { + public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) { if (!stopped) { log.debug("Received heartbeat request from {}.", requestOrigin); @@ -193,12 +194,21 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { Future<O> futurePayload = heartbeatListener.retrievePayload(); if (futurePayload != null) { - futurePayload.thenAcceptAsync(new AcceptFunction<O>() { + Future<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync(new AcceptFunction<O>() { @Override public void accept(O retrievedPayload) { heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload); } }, executor); + + sendHeartbeatFuture.exceptionally(new ApplyFunction<Throwable, Void>() { + @Override + public Void apply(Throwable failure) { + log.warn("Could not send heartbeat to target with id {}.", requestOrigin, failure); + + return null; + } + }); } else { heartbeatTarget.receiveHeartbeat(ownResourceID, null); } http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java index 32f8aa3..53837d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.heartbeat; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.slf4j.Logger; @@ -67,12 +68,21 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); if (futurePayload != null) { - futurePayload.thenAcceptAsync(new AcceptFunction<O>() { + Future<Void> requestHeartbeatFuture = futurePayload.thenAcceptAsync(new AcceptFunction<O>() { @Override public void accept(O payload) { heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); } }, getExecutor()); + + requestHeartbeatFuture.exceptionally(new ApplyFunction<Throwable, Void>() { + @Override + public Void apply(Throwable failure) { + log.warn("Could not request the heartbeat from target {}.", heartbeatTarget, failure); + + return null; + } + }); } else { heartbeatTarget.requestHeartbeat(getOwnResourceID(), null); } http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 8ba5040..6d482b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -321,7 +321,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { resourceManagerRequestsTimeout); // on success, trigger let the slot pool know - rmResponse.thenAcceptAsync(new AcceptFunction<RMSlotRequestReply>() { + Future<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(new AcceptFunction<RMSlotRequestReply>() { @Override public void accept(RMSlotRequestReply reply) { if (reply.getAllocationID() != null && reply.getAllocationID().equals(allocationID)) { @@ -346,7 +346,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { }, getMainThreadExecutor()); // on failure, fail the request future - rmResponse.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + slotRequestProcessingFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { @Override public Void apply(Throwable failure) { @@ -372,6 +372,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { if (request != null) { request.future().completeExceptionally(new NoResourceAvailableException( "No pooled slot available and request to ResourceManager for new slot failed", failure)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Unregistered slot request {} failed.", allocationID, failure); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java index 78d4dbc..8ebbf92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -95,7 +95,7 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes Future<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture(); - future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, Success>>() { + Future<Void> registrationSuccessFuture = future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, Success>>() { @Override public void accept(Tuple2<Gateway, Success> result) { targetGateway = result.f0; @@ -104,7 +104,7 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes }, executor); // this future should only ever fail if there is a bug, not if the registration is declined - future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + registrationSuccessFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { @Override public Void apply(Throwable failure) { onRegistrationFailure(failure); http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index 32dd978..a470b49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -182,7 +182,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType); // upon success, start the registration attempts - resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() { + Future<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() { @Override public void accept(Gateway result) { log.info("Resolved {} address, beginning registration", targetName); @@ -191,7 +191,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e }, rpcService.getExecutor()); // upon failure, retry, unless this is cancelled - resourceManagerFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + resourceManagerAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { @Override public Void apply(Throwable failure) { if (!isCanceled()) { @@ -225,7 +225,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); // if the registration was successful, let the TaskExecutor know - registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() { + Future<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() { @Override public void accept(RegistrationResponse result) { if (!isCanceled()) { @@ -251,7 +251,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e }, rpcService.getExecutor()); // upon failure, retry - registrationFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + registrationAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { @Override public Void apply(Throwable failure) { if (!isCanceled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 91fbba6..1430a49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -456,7 +456,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> } else { Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(address, InfoMessageListenerRpcGateway.class); - infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() { + Future<Void> infoMessageListenerAcceptFuture = infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() { @Override public void accept(InfoMessageListenerRpcGateway gateway) { log.info("Receive a registration from info message listener on ({})", address); @@ -464,7 +464,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> } }, getMainThreadExecutor()); - infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + infoMessageListenerAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { @Override public Void apply(Throwable failure) { log.warn("Receive a registration from unreachable info message listener on ({})", address); http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 00a1bf8..83c225f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -715,7 +715,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { leaderId, taskManagerConfiguration.getTimeout()); - acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() { + Future<Void> acceptedSlotsAcceptFuture = acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() { @Override public void accept(Iterable<SlotOffer> acceptedSlots) { // check if the response is still valid @@ -738,13 +738,17 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } }, getMainThreadExecutor()); - acceptedSlotsFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + acceptedSlotsAcceptFuture.exceptionally(new ApplyFunction<Throwable, Void>() { @Override public Void apply(Throwable throwable) { if (throwable instanceof TimeoutException) { + log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering."); // We ran into a timeout. Try again. offerSlotsToJobManager(jobId); } else { + log.warn("Slot offering to JobManager failed. Freeing the slots " + + "and returning them to the ResourceManager.", throwable); + // We encountered an exception. Free the slots and return them to the RM. for (SlotOffer reservedSlot: reservedSlots) { freeSlot(reservedSlot.getAllocationId(), throwable); @@ -753,7 +757,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { return null; } - }, getMainThreadExecutor()); + }); } else { log.debug("There are no unassigned slots for the job {}.", jobId); } http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9d53aa2..73d093b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -44,7 +44,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -1094,12 +1094,20 @@ class JobManager( val originalSender = new AkkaActorGateway(sender(), leaderSessionID.orNull) - stackTraceFuture.thenAccept(new AcceptFunction[StackTrace] { + val sendingFuture = stackTraceFuture.thenAccept(new AcceptFunction[StackTrace] { override def accept(value: StackTrace): Unit = { originalSender.tell(value) } }) + sendingFuture.exceptionally(new ApplyFunction[Throwable, Void] { + override def apply(value: Throwable): Void = { + log.info("Could not send requested stack trace.", value) + + return null + } + }) + case Terminated(taskManagerActorRef) => taskManagerMap.get(taskManagerActorRef) match { case Some(instanceId) => handleTaskManagerTerminated(taskManagerActorRef, instanceId) http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java index 0bdc563..6808a5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java @@ -536,6 +536,66 @@ public class FlinkFutureTest extends TestLogger { } } + /** + * Tests that a chain of dependent futures will be completed exceptionally if the initial future + * is completed exceptionally. + */ + @Test(timeout = 10000) + public void testChainedFutureExceptionalCompletion() throws ExecutionException, InterruptedException { + final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>(); + + Future<String> apply = future.thenApplyAsync(new ApplyFunction<String, String>() { + @Override + public String apply(String value) { + return value; + } + }, executor); + + Future<Throwable> applyException = apply.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() { + @Override + public Throwable apply(Throwable value) { + return value; + } + }, executor); + + Future<Void> accept1 = future.thenAcceptAsync(new AcceptFunction<String>() { + @Override + public void accept(String value) { + // noop + } + }, executor); + + Future<Throwable> accept1Exception = accept1.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() { + @Override + public Throwable apply(Throwable value) { + return value; + } + }, executor); + + Future<Void> accept2 = future.thenAcceptAsync(new AcceptFunction<String>() { + @Override + public void accept(String value) { + // noop + } + }, executor); + + Future<Throwable> accept2Exception = accept2.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() { + @Override + public Throwable apply(Throwable value) { + return value; + } + }, executor); + + TestException testException = new TestException("test"); + + // fail the initial future + future.completeExceptionally(testException); + + assertEquals(testException, applyException.get()); + assertEquals(testException, accept1Exception.get()); + assertEquals(testException, accept2Exception.get()); + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = -1274022962838535130L; http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index 538e286..b4149b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -49,6 +49,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -82,7 +83,7 @@ public class SlotPoolTest extends TestLogger { this.resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class))) - .thenReturn(mock(Future.class)); + .thenReturn(mock(Future.class, RETURNS_MOCKS)); slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway); } http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 43f33a3..e74ba29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -150,6 +150,11 @@ public class TaskExecutorITCase { when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class))) .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234))); when(jmGateway.getHostname()).thenReturn(jmAddress); + when(jmGateway.offerSlots( + eq(taskManagerResourceId), + any(Iterable.class), + eq(jmLeaderId), + any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS)); rpcService.registerGateway(rmAddress, resourceManager.getSelf()); http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 12ec69d..5702eeb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -67,7 +67,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -216,13 +215,11 @@ public class TaskExecutorTest extends TestLogger { final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { - final FatalErrorHandler errorHandler = mock(FatalErrorHandler.class); - // register a mock resource manager gateway ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); when(rmGateway.registerTaskExecutor( any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) - .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success())); + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(new InstanceID(), 10L))); TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); @@ -289,10 +286,12 @@ public class TaskExecutorTest extends TestLogger { when(rmGateway1.registerTaskExecutor( any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) - .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success())); + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed( + new TaskExecutorRegistrationSuccess(new InstanceID(), 10L))); when(rmGateway2.registerTaskExecutor( any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class))) - .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success())); + .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed( + new TaskExecutorRegistrationSuccess(new InstanceID(), 10L))); rpc.registerGateway(address1, rmGateway1); rpc.registerGateway(address2, rmGateway2); @@ -547,6 +546,11 @@ public class TaskExecutorTest extends TestLogger { any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); + when(jobMasterGateway.offerSlots( + any(ResourceID.class), + any(Iterable.class), + any(UUID.class), + any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS)); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway);
