Repository: flink
Updated Branches:
  refs/heads/master a30355356 -> ffb056072


[FLINK-6050] [robustness] Register exception handler on thenAccept futures

When applying an AcceptFunction on a Future x, then we should register the 
exception handler
on the returned thenAccept future instead of on x. This has the advantage that 
we also catch
exceptions which are thrown inside of the AcceptFunction and not only those 
which originate
from x. The PR adapts the code respectively.

This closes #3537.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffb05607
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffb05607
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffb05607

Branch: refs/heads/master
Commit: ffb0560728df53ac61d77a029562dbd5c0da8d75
Parents: a303553
Author: Till Rohrmann <[email protected]>
Authored: Tue Mar 14 17:23:30 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 21 10:55:24 2017 +0100

----------------------------------------------------------------------
 .../runtime/heartbeat/HeartbeatManagerImpl.java | 14 ++++-
 .../heartbeat/HeartbeatManagerSenderImpl.java   | 12 +++-
 .../apache/flink/runtime/instance/SlotPool.java |  8 ++-
 .../registration/RegisteredRpcConnection.java   |  4 +-
 .../registration/RetryingRegistration.java      |  8 +--
 .../resourcemanager/ResourceManager.java        |  4 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 10 +++-
 .../flink/runtime/jobmanager/JobManager.scala   | 12 +++-
 .../runtime/concurrent/FlinkFutureTest.java     | 60 ++++++++++++++++++++
 .../flink/runtime/instance/SlotPoolTest.java    |  3 +-
 .../taskexecutor/TaskExecutorITCase.java        |  5 ++
 .../runtime/taskexecutor/TaskExecutorTest.java  | 16 ++++--
 12 files changed, 131 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 9860b4d..28ab086 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.util.Preconditions;
@@ -179,7 +180,7 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O> {
        }
 
        @Override
-       public void requestHeartbeat(ResourceID requestOrigin, I 
heartbeatPayload) {
+       public void requestHeartbeat(final ResourceID requestOrigin, I 
heartbeatPayload) {
                if (!stopped) {
                        log.debug("Received heartbeat request from {}.", 
requestOrigin);
 
@@ -193,12 +194,21 @@ public class HeartbeatManagerImpl<I, O> implements 
HeartbeatManager<I, O> {
                                Future<O> futurePayload = 
heartbeatListener.retrievePayload();
 
                                if (futurePayload != null) {
-                                       futurePayload.thenAcceptAsync(new 
AcceptFunction<O>() {
+                                       Future<Void> sendHeartbeatFuture = 
futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
                                                @Override
                                                public void accept(O 
retrievedPayload) {
                                                        
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload);
                                                }
                                        }, executor);
+
+                                       sendHeartbeatFuture.exceptionally(new 
ApplyFunction<Throwable, Void>() {
+                                               @Override
+                                               public Void apply(Throwable 
failure) {
+                                                       log.warn("Could not 
send heartbeat to target with id {}.", requestOrigin, failure);
+
+                                                       return null;
+                                               }
+                                       });
                                } else {
                                        
heartbeatTarget.receiveHeartbeat(ownResourceID, null);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
index 32f8aa3..53837d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.slf4j.Logger;
@@ -67,12 +68,21 @@ public class HeartbeatManagerSenderImpl<I, O> extends 
HeartbeatManagerImpl<I, O>
                                final HeartbeatTarget<O> heartbeatTarget = 
heartbeatMonitor.getHeartbeatTarget();
 
                                if (futurePayload != null) {
-                                       futurePayload.thenAcceptAsync(new 
AcceptFunction<O>() {
+                                       Future<Void> requestHeartbeatFuture = 
futurePayload.thenAcceptAsync(new AcceptFunction<O>() {
                                                @Override
                                                public void accept(O payload) {
                                                        
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
                                                }
                                        }, getExecutor());
+
+                                       
requestHeartbeatFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
+                                               @Override
+                                               public Void apply(Throwable 
failure) {
+                                                       log.warn("Could not 
request the heartbeat from target {}.", heartbeatTarget, failure);
+
+                                                       return null;
+                                               }
+                                       });
                                } else {
                                        
heartbeatTarget.requestHeartbeat(getOwnResourceID(), null);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 8ba5040..6d482b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -321,7 +321,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                                resourceManagerRequestsTimeout);
 
                // on success, trigger let the slot pool know
-               rmResponse.thenAcceptAsync(new 
AcceptFunction<RMSlotRequestReply>() {
+               Future<Void> slotRequestProcessingFuture = 
rmResponse.thenAcceptAsync(new AcceptFunction<RMSlotRequestReply>() {
                        @Override
                        public void accept(RMSlotRequestReply reply) {
                                if (reply.getAllocationID() != null && 
reply.getAllocationID().equals(allocationID)) {
@@ -346,7 +346,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                }, getMainThreadExecutor());
 
                // on failure, fail the request future
-               rmResponse.exceptionallyAsync(new ApplyFunction<Throwable, 
Void>() {
+               slotRequestProcessingFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
 
                        @Override
                        public Void apply(Throwable failure) {
@@ -372,6 +372,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> 
{
                if (request != null) {
                        request.future().completeExceptionally(new 
NoResourceAvailableException(
                                        "No pooled slot available and request 
to ResourceManager for new slot failed", failure));
+               } else {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Unregistered slot request {} 
failed.", allocationID, failure);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index 78d4dbc..8ebbf92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -95,7 +95,7 @@ public abstract class RegisteredRpcConnection<Gateway extends 
RpcGateway, Succes
 
                Future<Tuple2<Gateway, Success>> future = 
pendingRegistration.getFuture();
 
-               future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, 
Success>>() {
+               Future<Void> registrationSuccessFuture = 
future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, Success>>() {
                        @Override
                        public void accept(Tuple2<Gateway, Success> result) {
                                targetGateway = result.f0;
@@ -104,7 +104,7 @@ public abstract class RegisteredRpcConnection<Gateway 
extends RpcGateway, Succes
                }, executor);
 
                // this future should only ever fail if there is a bug, not if 
the registration is declined
-               future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+               registrationSuccessFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
                        @Override
                        public Void apply(Throwable failure) {
                                onRegistrationFailure(failure);

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 32dd978..a470b49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -182,7 +182,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                        Future<Gateway> resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
        
                        // upon success, start the registration attempts
-                       resourceManagerFuture.thenAcceptAsync(new 
AcceptFunction<Gateway>() {
+                       Future<Void> resourceManagerAcceptFuture = 
resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() {
                                @Override
                                public void accept(Gateway result) {
                                        log.info("Resolved {} address, 
beginning registration", targetName);
@@ -191,7 +191,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                        }, rpcService.getExecutor());
 
                        // upon failure, retry, unless this is cancelled
-                       resourceManagerFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                       resourceManagerAcceptFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
                                @Override
                                public Void apply(Throwable failure) {
                                        if (!isCanceled()) {
@@ -225,7 +225,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                        Future<RegistrationResponse> registrationFuture = 
invokeRegistration(gateway, leaderId, timeoutMillis);
        
                        // if the registration was successful, let the 
TaskExecutor know
-                       registrationFuture.thenAcceptAsync(new 
AcceptFunction<RegistrationResponse>() {
+                       Future<Void> registrationAcceptFuture = 
registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() {
                                @Override
                                public void accept(RegistrationResponse result) 
{
                                        if (!isCanceled()) {
@@ -251,7 +251,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                        }, rpcService.getExecutor());
        
                        // upon failure, retry
-                       registrationFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                       registrationAcceptFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
                                @Override
                                public Void apply(Throwable failure) {
                                        if (!isCanceled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 91fbba6..1430a49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -456,7 +456,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                } else {
                        Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = getRpcService().connect(address, 
InfoMessageListenerRpcGateway.class);
 
-                       infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
+                       Future<Void> infoMessageListenerAcceptFuture = 
infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
                                @Override
                                public void 
accept(InfoMessageListenerRpcGateway gateway) {
                                        log.info("Receive a registration from 
info message listener on ({})", address);
@@ -464,7 +464,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                }
                        }, getMainThreadExecutor());
 
-                       
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                       infoMessageListenerAcceptFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
                                @Override
                                public Void apply(Throwable failure) {
                                        log.warn("Receive a registration from 
unreachable info message listener on ({})", address);

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 00a1bf8..83c225f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -715,7 +715,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                        leaderId,
                                        taskManagerConfiguration.getTimeout());
 
-                               acceptedSlotsFuture.thenAcceptAsync(new 
AcceptFunction<Iterable<SlotOffer>>() {
+                               Future<Void> acceptedSlotsAcceptFuture = 
acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() {
                                        @Override
                                        public void accept(Iterable<SlotOffer> 
acceptedSlots) {
                                                // check if the response is 
still valid
@@ -738,13 +738,17 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                        }
                                }, getMainThreadExecutor());
 
-                               acceptedSlotsFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                               acceptedSlotsAcceptFuture.exceptionally(new 
ApplyFunction<Throwable, Void>() {
                                        @Override
                                        public Void apply(Throwable throwable) {
                                                if (throwable instanceof 
TimeoutException) {
+                                                       log.info("Slot offering 
to JobManager did not finish in time. Retrying the slot offering.");
                                                        // We ran into a 
timeout. Try again.
                                                        
offerSlotsToJobManager(jobId);
                                                } else {
+                                                       log.warn("Slot offering 
to JobManager failed. Freeing the slots " +
+                                                               "and returning 
them to the ResourceManager.", throwable);
+
                                                        // We encountered an 
exception. Free the slots and return them to the RM.
                                                        for (SlotOffer 
reservedSlot: reservedSlots) {
                                                                
freeSlot(reservedSlot.getAllocationId(), throwable);
@@ -753,7 +757,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                                                return null;
                                        }
-                               }, getMainThreadExecutor());
+                               });
                        } else {
                                log.debug("There are no unassigned slots for 
the job {}.", jobId);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9d53aa2..73d093b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,7 +44,7 @@ import 
org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{AcceptFunction, BiFunction, 
Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, 
BiFunction, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -1094,12 +1094,20 @@ class JobManager(
 
       val originalSender = new AkkaActorGateway(sender(), 
leaderSessionID.orNull)
 
-      stackTraceFuture.thenAccept(new AcceptFunction[StackTrace] {
+      val sendingFuture = stackTraceFuture.thenAccept(new 
AcceptFunction[StackTrace] {
         override def accept(value: StackTrace): Unit = {
           originalSender.tell(value)
         }
       })
 
+      sendingFuture.exceptionally(new ApplyFunction[Throwable, Void] {
+        override def apply(value: Throwable): Void = {
+          log.info("Could not send requested stack trace.", value)
+
+          return null
+        }
+      })
+
     case Terminated(taskManagerActorRef) =>
       taskManagerMap.get(taskManagerActorRef) match {
         case Some(instanceId) => 
handleTaskManagerTerminated(taskManagerActorRef, instanceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index 0bdc563..6808a5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -536,6 +536,66 @@ public class FlinkFutureTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that a chain of dependent futures will be completed 
exceptionally if the initial future
+        * is completed exceptionally.
+        */
+       @Test(timeout = 10000)
+       public void testChainedFutureExceptionalCompletion() throws 
ExecutionException, InterruptedException {
+               final FlinkCompletableFuture<String> future = new 
FlinkCompletableFuture<>();
+
+               Future<String> apply = future.thenApplyAsync(new 
ApplyFunction<String, String>() {
+                       @Override
+                       public String apply(String value) {
+                               return value;
+                       }
+               }, executor);
+
+               Future<Throwable> applyException = apply.exceptionallyAsync(new 
ApplyFunction<Throwable, Throwable>() {
+                       @Override
+                       public Throwable apply(Throwable value) {
+                               return value;
+                       }
+               }, executor);
+
+               Future<Void> accept1 = future.thenAcceptAsync(new 
AcceptFunction<String>() {
+                       @Override
+                       public void accept(String value) {
+                               // noop
+                       }
+               }, executor);
+
+               Future<Throwable> accept1Exception = 
accept1.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() {
+                       @Override
+                       public Throwable apply(Throwable value) {
+                               return value;
+                       }
+               }, executor);
+
+               Future<Void> accept2 = future.thenAcceptAsync(new 
AcceptFunction<String>() {
+                       @Override
+                       public void accept(String value) {
+                               // noop
+                       }
+               }, executor);
+
+               Future<Throwable> accept2Exception = 
accept2.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() {
+                       @Override
+                       public Throwable apply(Throwable value) {
+                               return value;
+                       }
+               }, executor);
+
+               TestException testException = new TestException("test");
+
+               // fail the initial future
+               future.completeExceptionally(testException);
+
+               assertEquals(testException, applyException.get());
+               assertEquals(testException, accept1Exception.get());
+               assertEquals(testException, accept2Exception.get());
+       }
+
        private static class TestException extends RuntimeException {
 
                private static final long serialVersionUID = 
-1274022962838535130L;

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 538e286..b4149b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -49,6 +49,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_MOCKS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -82,7 +83,7 @@ public class SlotPoolTest extends TestLogger {
                this.resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                when(resourceManagerGateway
                        .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class)))
-                       .thenReturn(mock(Future.class));
+                       .thenReturn(mock(Future.class, RETURNS_MOCKS));
 
                slotPool.connectToResourceManager(UUID.randomUUID(), 
resourceManagerGateway);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 43f33a3..e74ba29 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -150,6 +150,11 @@ public class TaskExecutorITCase {
                when(jmGateway.registerTaskManager(any(String.class), 
any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
                        
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
                when(jmGateway.getHostname()).thenReturn(jmAddress);
+               when(jmGateway.offerSlots(
+                       eq(taskManagerResourceId),
+                       any(Iterable.class),
+                       eq(jmLeaderId),
+                       any(Time.class))).thenReturn(mock(Future.class, 
RETURNS_MOCKS));
 
 
                rpcService.registerGateway(rmAddress, 
resourceManager.getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/ffb05607/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 12ec69d..5702eeb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -67,7 +67,6 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -216,13 +215,11 @@ public class TaskExecutorTest extends TestLogger {
 
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
                try {
-                       final FatalErrorHandler errorHandler = 
mock(FatalErrorHandler.class);
-
                        // register a mock resource manager gateway
                        ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
                        when(rmGateway.registerTaskExecutor(
                                        any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-                               
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
RegistrationResponse.Success()));
+                               
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
 
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
@@ -289,10 +286,12 @@ public class TaskExecutorTest extends TestLogger {
 
                        when(rmGateway1.registerTaskExecutor(
                                        any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-                                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
RegistrationResponse.Success()));
+                                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+                                               new 
TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
                        when(rmGateway2.registerTaskExecutor(
                                        any(UUID.class), anyString(), 
any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-                                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
RegistrationResponse.Success()));
+                                       
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+                                               new 
TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
 
                        rpc.registerGateway(address1, rmGateway1);
                        rpc.registerGateway(address2, rmGateway2);
@@ -547,6 +546,11 @@ public class TaskExecutorTest extends TestLogger {
                                any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
+               when(jobMasterGateway.offerSlots(
+                       any(ResourceID.class),
+                       any(Iterable.class),
+                       any(UUID.class),
+                       any(Time.class))).thenReturn(mock(Future.class, 
RETURNS_MOCKS));
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);

Reply via email to