[FLINK-7387] [rpc] Require RpcEndpoints to directly implement RpcGateways

This commit changes the relation between RpcEndpoints and RpcGateways. From now 
on,
the RpcEndpoints have to implement the RpcGateways they want to support instead 
of
coupling it loosely via a type parameter. In order to obtain self gateway a new
method RpcEndpoint#getSelfGateway(Class) has been introduced. This method can 
be used
to obtain the RpcGateway type at run time to talk to the RpcEndpoint 
asynchronously.

All existing RpcEndpoints have been adapted to the new model. This basically 
means
that they now return a CompletableFuture<X> instead of X.

Add RpcEndpointTest

This closes #4498.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d95d20eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d95d20eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d95d20eb

Branch: refs/heads/master
Commit: d95d20eb45185151d731c9b41e9b1ac22cf81334
Parents: 9f790d3
Author: Till Rohrmann <[email protected]>
Authored: Tue Aug 8 14:43:47 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Fri Aug 11 13:49:44 2017 +0200

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |   6 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  32 +-
 .../apache/flink/runtime/instance/SlotPool.java |  81 ++--
 .../flink/runtime/instance/SlotPoolGateway.java |   3 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 212 +++++----
 .../runtime/jobmaster/JobMasterGateway.java     |   3 +-
 .../resourcemanager/ResourceManager.java        |  66 +--
 .../runtime/rpc/MainThreadValidatorUtil.java    |   4 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 107 ++---
 .../org/apache/flink/runtime/rpc/RpcMethod.java |  37 --
 .../org/apache/flink/runtime/rpc/RpcServer.java |  34 ++
 .../apache/flink/runtime/rpc/RpcService.java    |  10 +-
 .../org/apache/flink/runtime/rpc/RpcUtils.java  |  52 +++
 .../apache/flink/runtime/rpc/SelfGateway.java   |  34 --
 .../runtime/rpc/akka/AkkaInvocationHandler.java |  88 +---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |   3 +-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  28 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 351 +++++++-------
 .../clusterframework/ResourceManagerTest.java   |   6 +-
 .../runtime/dispatcher/DispatcherTest.java      |   2 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |   2 +-
 .../flink/runtime/instance/SlotPoolTest.java    |  34 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |   3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |   2 +-
 .../ResourceManagerJobMasterTest.java           |  17 +-
 .../ResourceManagerTaskExecutorTest.java        |   8 +-
 .../flink/runtime/rpc/AsyncCallsTest.java       |  10 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 452 -------------------
 .../flink/runtime/rpc/RpcEndpointTest.java      | 195 ++++++++
 .../runtime/rpc/TestingSerialRpcService.java    |  95 +---
 .../runtime/rpc/akka/AkkaRpcActorTest.java      |  29 +-
 .../rpc/akka/MainThreadValidationTest.java      |   9 +-
 .../rpc/akka/MessageSerializationTest.java      |   7 +-
 .../taskexecutor/TaskExecutorITCase.java        |   8 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  41 +-
 36 files changed, 920 insertions(+), 1153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e63b4ab..f9e35a9 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -122,6 +122,8 @@ public class MesosResourceManagerTest extends TestLogger {
 
        private static ActorSystem system;
 
+       private static final Time timeout = Time.seconds(10L);
+
        @Before
        public void setup() {
                system = AkkaUtils.createLocalActorSystem(flinkConfig);
@@ -415,7 +417,7 @@ public class MesosResourceManagerTest extends TestLogger {
                 */
                public void registerJobMaster(MockJobMaster jobMaster) throws 
Exception  {
                        CompletableFuture<RegistrationResponse> registration = 
resourceManager.registerJobManager(
-                               rmServices.rmLeaderSessionId, 
jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, 
jobMaster.jobID);
+                               rmServices.rmLeaderSessionId, 
jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, 
jobMaster.jobID, timeout);
                        assertTrue(registration.get() instanceof 
JobMasterRegistrationSuccess);
                }
 
@@ -589,7 +591,7 @@ public class MesosResourceManagerTest extends TestLogger {
 
                        // send registration message
                        CompletableFuture<RegistrationResponse> 
successfulFuture =
-                               
resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, 
task1Executor.address, task1Executor.resourceID, slotReport);
+                               
resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, 
task1Executor.address, task1Executor.resourceID, slotReport, timeout);
                        RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
                        assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 2eb0e36..9fc1fc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -37,7 +39,6 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -47,6 +48,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Base class for the Dispatcher component. The Dispatcher component is 
responsible
@@ -54,7 +56,7 @@ import java.util.Map;
  * the jobs and to recover them in case of a master failure. Furthermore, it 
knows
  * about the state of the Flink session cluster.
  */
-public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
+public abstract class Dispatcher extends RpcEndpoint implements 
DispatcherGateway {
 
        public static final String DISPATCHER_NAME = "dispatcher";
 
@@ -131,8 +133,8 @@ public abstract class Dispatcher extends 
RpcEndpoint<DispatcherGateway> {
        // RPCs
        //------------------------------------------------------
 
-       @RpcMethod
-       public Acknowledge submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
+       @Override
+       public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
                final JobID jobId = jobGraph.getJobID();
 
                log.info("Submitting job {} ({}).", jobGraph.getJobID(), 
jobGraph.getName());
@@ -143,7 +145,8 @@ public abstract class Dispatcher extends 
RpcEndpoint<DispatcherGateway> {
                        jobSchedulingStatus = 
runningJobsRegistry.getJobSchedulingStatus(jobId);
                } catch (IOException e) {
                        log.warn("Cannot retrieve job status for {}.", jobId, 
e);
-                       throw new JobSubmissionException(jobId, "Could not 
retrieve the job status.", e);
+                       return FutureUtils.completedExceptionally(
+                               new JobSubmissionException(jobId, "Could not 
retrieve the job status.", e));
                }
 
                if (jobSchedulingStatus == 
RunningJobsRegistry.JobSchedulingStatus.PENDING) {
@@ -151,7 +154,8 @@ public abstract class Dispatcher extends 
RpcEndpoint<DispatcherGateway> {
                                submittedJobGraphStore.putJobGraph(new 
SubmittedJobGraph(jobGraph, null));
                        } catch (Exception e) {
                                log.warn("Cannot persist JobGraph.", e);
-                               throw new JobSubmissionException(jobId, "Could 
not persist JobGraph.", e);
+                               return FutureUtils.completedExceptionally(
+                                       new JobSubmissionException(jobId, 
"Could not persist JobGraph.", e));
                        }
 
                        final JobManagerRunner jobManagerRunner;
@@ -180,22 +184,24 @@ public abstract class Dispatcher extends 
RpcEndpoint<DispatcherGateway> {
                                        e.addSuppressed(t);
                                }
 
-                               throw new JobSubmissionException(jobId, "Could 
not start JobManager.", e);
+                               return FutureUtils.completedExceptionally(
+                                       new JobSubmissionException(jobId, 
"Could not start JobManager.", e));
                        }
 
                        jobManagerRunners.put(jobId, jobManagerRunner);
 
-                       return Acknowledge.get();
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                } else {
-                       throw new JobSubmissionException(jobId, "Job has 
already been submitted and " +
-                               "is currently in state " + jobSchedulingStatus 
+ '.');
+                       return FutureUtils.completedExceptionally(
+                               new JobSubmissionException(jobId, "Job has 
already been submitted and " +
+                                       "is currently in state " + 
jobSchedulingStatus + '.'));
                }
        }
 
-       @RpcMethod
-       public Collection<JobID> listJobs() {
+       @Override
+       public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
                // TODO: return proper list of running jobs
-               return jobManagerRunners.keySet();
+               return 
CompletableFuture.completedFuture(jobManagerRunners.keySet());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 508e54f..de2b3e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -35,9 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.Clock;
@@ -46,15 +45,19 @@ import org.apache.flink.runtime.util.clock.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -73,7 +76,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * TODO : Make pending requests location preference aware
  * TODO : Make pass location preferences to ResourceManager when sending a 
slot request
  */
-public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
+public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
        /** The log for the pool - shared also with the internal classes */
        static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
@@ -154,7 +157,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                this.pendingRequests = new HashMap<>();
                this.waitingForResourceManager = new HashMap<>();
 
-               this.providerAndOwner = new ProviderAndOwner(getSelf(), 
slotRequestTimeout);
+               this.providerAndOwner = new 
ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout);
        }
 
        // 
------------------------------------------------------------------------
@@ -187,12 +190,12 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
        /**
         * Suspends this pool, meaning it has lost its authority to accept and 
distribute slots.
         */
-       @RpcMethod
+       @Override
        public void suspend() {
                validateRunsInMainThread();
 
                // suspend this RPC endpoint
-               ((StartStoppable) getSelf()).stop();
+               stop();
 
                // do not accept any requests
                jobManagerLeaderId = null;
@@ -236,7 +239,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
        //  Resource Manager Connection
        // 
------------------------------------------------------------------------
 
-       @RpcMethod
+       @Override
        public void connectToResourceManager(UUID resourceManagerLeaderId, 
ResourceManagerGateway resourceManagerGateway) {
                this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
                this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
@@ -250,7 +253,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                waitingForResourceManager.clear();
        }
 
-       @RpcMethod
+       @Override
        public void disconnectResourceManager() {
                this.resourceManagerLeaderId = null;
                this.resourceManagerGateway = null;
@@ -260,16 +263,17 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
        //  Slot Allocation
        // 
------------------------------------------------------------------------
 
-       @RpcMethod
+       @Override
        public CompletableFuture<SimpleSlot> allocateSlot(
                        ScheduledUnit task,
                        ResourceProfile resources,
-                       Iterable<TaskManagerLocation> locationPreferences) {
+                       Iterable<TaskManagerLocation> locationPreferences,
+                       Time timeout) {
 
                return internalAllocateSlot(task, resources, 
locationPreferences);
        }
 
-       @RpcMethod
+       @Override
        public void returnAllocatedSlot(Slot slot) {
                internalReturnAllocatedSlot(slot);
        }
@@ -457,18 +461,39 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
                return null;
        }
 
-       @RpcMethod
-       public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, 
SlotOffer>> offers) {
+       @Override
+       public CompletableFuture<Collection<SlotOffer>> 
offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers) {
                validateRunsInMainThread();
 
-               final ArrayList<SlotOffer> result = new ArrayList<>();
-               for (Tuple2<AllocatedSlot, SlotOffer> offer : offers) {
-                       if (offerSlot(offer.f0)) {
-                               result.add(offer.f1);
+               List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers 
= offers.stream().map(
+                       offer -> {
+                               CompletableFuture<Optional<SlotOffer>> 
acceptedSlotOffer = offerSlot(offer.f0).thenApply(
+                                       (acceptedSlot) -> {
+                                               if (acceptedSlot) {
+                                                       return 
Optional.of(offer.f1);
+                                               } else {
+                                                       return Optional.empty();
+                                               }
+                                       });
+
+                               return acceptedSlotOffer;
                        }
-               }
+               ).collect(Collectors.toList());
+
+               CompletableFuture<Collection<Optional<SlotOffer>>> 
optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
+
+               CompletableFuture<Collection<SlotOffer>> resultingSlotOffers = 
optionalSlotOffers.thenApply(
+                       collection -> {
+                               Collection<SlotOffer> slotOffers = collection
+                                       .stream()
+                                       .flatMap(
+                                               opt -> 
opt.map(Stream::of).orElseGet(Stream::empty))
+                                       .collect(Collectors.toList());
 
-               return result.isEmpty() ? Collections.<SlotOffer>emptyList() : 
result;
+                               return slotOffers;
+                       });
+
+               return resultingSlotOffers;
        }
        
        /**
@@ -480,8 +505,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
         * @param slot The offered slot
         * @return True if we accept the offering
         */
-       @RpcMethod
-       public boolean offerSlot(final AllocatedSlot slot) {
+       @Override
+       public CompletableFuture<Boolean> offerSlot(final AllocatedSlot slot) {
                validateRunsInMainThread();
 
                // check if this TaskManager is valid
@@ -491,7 +516,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                if (!registeredTaskManagers.contains(resourceID)) {
                        LOG.debug("Received outdated slot offering [{}] from 
unregistered TaskManager: {}",
                                        slot.getSlotAllocationId(), slot);
-                       return false;
+                       return CompletableFuture.completedFuture(false);
                }
 
                // check whether we have already using this slot
@@ -500,7 +525,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
                        // return true here so that the sender will get a 
positive acknowledgement to the retry
                        // and mark the offering as a success
-                       return true;
+                       return CompletableFuture.completedFuture(true);
                }
 
                // check whether we have request waiting for this slot
@@ -520,7 +545,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
                // we accepted the request in any case. slot will be released 
after it idled for
                // too long and timed out
-               return true;
+               return CompletableFuture.completedFuture(true);
        }
 
        
@@ -541,7 +566,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
         * @param allocationID Represents the allocation which should be failed
         * @param cause        The cause of the failure
         */
-       @RpcMethod
+       @Override
        public void failAllocation(final AllocationID allocationID, final 
Exception cause) {
                final PendingRequest pendingRequest = 
pendingRequests.remove(allocationID);
                if (pendingRequest != null) {
@@ -576,7 +601,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
         *
         * @param resourceID The id of the TaskManager
         */
-       @RpcMethod
+       @Override
        public void registerTaskManager(final ResourceID resourceID) {
                registeredTaskManagers.add(resourceID);
        }
@@ -587,7 +612,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
         *
         * @param resourceID The id of the TaskManager
         */
-       @RpcMethod
+       @Override
        public void releaseTaskManager(final ResourceID resourceID) {
                if (registeredTaskManagers.remove(resourceID)) {
                        availableSlots.removeAllForTaskManager(resourceID);

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 43f407a..8d4f2a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
@@ -77,7 +78,7 @@ public interface SlotPoolGateway extends RpcGateway {
 
        CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);
 
-       CompletableFuture<Iterable<SlotOffer>> 
offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
+       CompletableFuture<Collection<SlotOffer>> 
offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers);
        
        void failAllocation(AllocationID allocationID, Exception cause);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index d557f6b..5838cf2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -433,7 +433,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        log.info("JobManager for job {} ({}) was revoked 
leadership at {}.",
                                jobGraph.getName(), jobGraph.getJobID(), 
getAddress());
 
-                       jobManager.getSelf().suspendExecution(new 
Exception("JobManager is no longer the leader."));
+                       
jobManager.getSelfGateway(JobMasterGateway.class).suspendExecution(new 
Exception("JobManager is no longer the leader."));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index cdae89f..31036f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -29,13 +29,13 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -82,9 +82,7 @@ import 
org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -93,12 +91,14 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -122,7 +122,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * given task</li>
  * </ul>
  */
-public class JobMaster extends RpcEndpoint<JobMasterGateway> {
+public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
        /** Default names for Flink's distributed components */
        public static final String JOB_MANAGER_NAME = "jobmanager";
@@ -133,6 +133,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        // 
------------------------------------------------------------------------
 
+       private final JobMasterGateway selfGateway;
+
        private final ResourceID resourceId;
 
        /** Logical representation of the job */
@@ -211,6 +213,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
 
+               selfGateway = getSelfGateway(JobMasterGateway.class);
+
                this.resourceId = checkNotNull(resourceId);
                this.jobGraph = checkNotNull(jobGraph);
                this.configuration = checkNotNull(configuration);
@@ -224,7 +228,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                this.taskManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
-                       new TaskManagerHeartbeatListener(),
+                       new TaskManagerHeartbeatListener(selfGateway),
                        rpcService.getScheduledExecutor(),
                        log);
 
@@ -263,7 +267,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
                this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
-               this.slotPoolGateway = slotPool.getSelf();
+               this.slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
 
                this.executionGraph = ExecutionGraphBuilder.buildGraph(
                        null,
@@ -307,7 +311,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        super.start();
 
                        log.info("JobManager started as leader {} for job {}", 
leaderSessionID, jobGraph.getJobID());
-                       getSelf().startJobExecution();
+                       selfGateway.startJobExecution();
                }
                else {
                        log.warn("Job already started with leader ID {}, 
ignoring this start request.", leaderSessionID);
@@ -334,7 +338,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        //-- job starting and stopping  
-----------------------------------------------------------------
 
-       @RpcMethod
+       @Override
        public void startJobExecution() {
                // double check that the leader status did not change
                if (leaderSessionID == null) {
@@ -393,7 +397,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         *
         * @param cause The reason of why this job been suspended.
         */
-       @RpcMethod
+       @Override
        public void suspendExecution(final Throwable cause) {
                if (leaderSessionID == null) {
                        log.debug("Job has already been suspended or 
shutdown.");
@@ -413,7 +417,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                executionGraph.suspend(cause);
 
                // receive no more messages until started again, should be 
called before we clear self leader id
-               ((StartStoppable) getSelf()).stop();
+               stop();
 
                // the slot pool stops receiving messages and clears its pooled 
slots 
                slotPoolGateway.suspend();
@@ -430,29 +434,39 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         * @param taskExecutionState New task execution state for a given task
         * @return Acknowledge the task execution state update
         */
-       @RpcMethod
-       public Acknowledge updateTaskExecutionState(
+       @Override
+       public CompletableFuture<Acknowledge> updateTaskExecutionState(
                        final UUID leaderSessionID,
-                       final TaskExecutionState taskExecutionState) throws 
Exception
+                       final TaskExecutionState taskExecutionState)
        {
                checkNotNull(taskExecutionState, "taskExecutionState");
-               validateLeaderSessionId(leaderSessionID);
+
+               try {
+                       validateLeaderSessionId(leaderSessionID);
+               } catch (LeaderIdMismatchException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
 
                if (executionGraph.updateState(taskExecutionState)) {
-                       return Acknowledge.get();
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
                } else {
-                       throw new ExecutionGraphException("The execution 
attempt " +
-                                       taskExecutionState.getID() + " was not 
found.");
+                       return FutureUtils.completedExceptionally(
+                               new ExecutionGraphException("The execution 
attempt " +
+                                       taskExecutionState.getID() + " was not 
found."));
                }
        }
 
-       @RpcMethod
-       public SerializedInputSplit requestNextInputSplit(
+       @Override
+       public CompletableFuture<SerializedInputSplit> requestNextInputSplit(
                        final UUID leaderSessionID,
                        final JobVertexID vertexID,
-                       final ExecutionAttemptID executionAttempt) throws 
Exception
-       {
-               validateLeaderSessionId(leaderSessionID);
+                       final ExecutionAttemptID executionAttempt) {
+
+               try {
+                       validateLeaderSessionId(leaderSessionID);
+               } catch (LeaderIdMismatchException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
 
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
                if (execution == null) {
@@ -462,19 +476,19 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                log.debug("Can not find Execution for attempt 
{}.", executionAttempt);
                        }
                        // but we should TaskManager be aware of this
-                       throw new Exception("Can not find Execution for attempt 
" + executionAttempt);
+                       return FutureUtils.completedExceptionally(new 
Exception("Can not find Execution for attempt " + executionAttempt));
                }
 
                final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
                if (vertex == null) {
                        log.error("Cannot find execution vertex for vertex ID 
{}.", vertexID);
-                       throw new Exception("Cannot find execution vertex for 
vertex ID " + vertexID);
+                       return FutureUtils.completedExceptionally(new 
Exception("Cannot find execution vertex for vertex ID " + vertexID));
                }
 
                final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
                if (splitAssigner == null) {
                        log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID);
-                       throw new Exception("No InputSplitAssigner for vertex 
ID " + vertexID);
+                       return FutureUtils.completedExceptionally(new 
Exception("No InputSplitAssigner for vertex ID " + vertexID));
                }
 
                final Slot slot = execution.getAssignedResource();
@@ -488,27 +502,31 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                try {
                        final byte[] serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
-                       return new SerializedInputSplit(serializedInputSplit);
+                       return CompletableFuture.completedFuture(new 
SerializedInputSplit(serializedInputSplit));
                } catch (Exception ex) {
                        log.error("Could not serialize the next input split of 
class {}.", nextInputSplit.getClass(), ex);
                        IOException reason = new IOException("Could not 
serialize the next input split of class " +
                                        nextInputSplit.getClass() + ".", ex);
                        vertex.fail(reason);
-                       throw reason;
+                       return FutureUtils.completedExceptionally(reason);
                }
        }
 
-       @RpcMethod
-       public ExecutionState requestPartitionState(
+       @Override
+       public CompletableFuture<ExecutionState> requestPartitionState(
                        final UUID leaderSessionID,
                        final IntermediateDataSetID intermediateResultId,
-                       final ResultPartitionID resultPartitionId) throws 
Exception {
+                       final ResultPartitionID resultPartitionId) {
 
-               validateLeaderSessionId(leaderSessionID);
+               try {
+                       validateLeaderSessionId(leaderSessionID);
+               } catch (LeaderIdMismatchException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
 
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
                if (execution != null) {
-                       return execution.getState();
+                       return 
CompletableFuture.completedFuture(execution.getState());
                }
                else {
                        final IntermediateResult intermediateResult =
@@ -522,29 +540,33 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                                .getCurrentExecutionAttempt();
 
                                if (producerExecution.getAttemptId() == 
resultPartitionId.getProducerId()) {
-                                       return producerExecution.getState();
+                                       return 
CompletableFuture.completedFuture(producerExecution.getState());
                                } else {
-                                       throw new 
PartitionProducerDisposedException(resultPartitionId);
+                                       return 
FutureUtils.completedExceptionally(new 
PartitionProducerDisposedException(resultPartitionId));
                                }
                        } else {
-                               throw new 
IllegalArgumentException("Intermediate data set with ID "
-                                               + intermediateResultId + " not 
found.");
+                               return FutureUtils.completedExceptionally(new 
IllegalArgumentException("Intermediate data set with ID "
+                                               + intermediateResultId + " not 
found."));
                        }
                }
        }
 
-       @RpcMethod
-       public Acknowledge scheduleOrUpdateConsumers(
+       @Override
+       public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
                        final UUID leaderSessionID,
-                       final ResultPartitionID partitionID) throws Exception
-       {
-               validateLeaderSessionId(leaderSessionID);
+                       final ResultPartitionID partitionID,
+                       final Time timeout) {
+               try {
+                       validateLeaderSessionId(leaderSessionID);
 
-               executionGraph.scheduleOrUpdateConsumers(partitionID);
-               return Acknowledge.get();
+                       executionGraph.scheduleOrUpdateConsumers(partitionID);
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               } catch (Exception e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
        }
 
-       @RpcMethod
+       @Override
        public void disconnectTaskManager(final ResourceID resourceID, final 
Exception cause) {
                taskManagerHeartbeatManager.unmonitorTarget(resourceID);
                slotPoolGateway.releaseTaskManager(resourceID);
@@ -558,13 +580,13 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        // TODO: This method needs a leader session ID
-       @RpcMethod
+       @Override
        public void acknowledgeCheckpoint(
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
                        final long checkpointId,
                        final CheckpointMetrics checkpointMetrics,
-                       final SubtaskState checkpointState) throws 
CheckpointException {
+                       final SubtaskState checkpointState) {
 
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
                final AcknowledgeCheckpoint ackMessage = 
@@ -588,7 +610,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        // TODO: This method needs a leader session ID
-       @RpcMethod
+       @Override
        public void declineCheckpoint(
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
@@ -616,8 +638,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
-       @RpcMethod
-       public KvStateLocation lookupKvStateLocation(final String 
registrationName) throws Exception {
+       @Override
+       public CompletableFuture<KvStateLocation> lookupKvStateLocation(final 
String registrationName) {
                if (log.isDebugEnabled()) {
                        log.debug("Lookup key-value state for job {} with 
registration " +
                                        "name {}.", jobGraph.getJobID(), 
registrationName);
@@ -626,13 +648,13 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
                final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
                if (location != null) {
-                       return location;
+                       return CompletableFuture.completedFuture(location);
                } else {
-                       throw new UnknownKvStateLocation(registrationName);
+                       return FutureUtils.completedExceptionally(new 
UnknownKvStateLocation(registrationName));
                }
        }
 
-       @RpcMethod
+       @Override
        public void notifyKvStateRegistered(
                        final JobVertexID jobVertexId,
                        final KeyGroupRange keyGroupRange,
@@ -653,7 +675,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
-       @RpcMethod
+       @Override
        public void notifyKvStateUnregistered(
                        JobVertexID jobVertexId,
                        KeyGroupRange keyGroupRange,
@@ -672,25 +694,31 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
-       @RpcMethod
-       public ClassloadingProps requestClassloadingProps() throws Exception {
-               return new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+       @Override
+       public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
+               return CompletableFuture.completedFuture(
+                       new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
                                executionGraph.getRequiredJarFiles(),
-                               executionGraph.getRequiredClasspaths());
+                               executionGraph.getRequiredClasspaths()));
        }
 
-       @RpcMethod
-       public CompletableFuture<Iterable<SlotOffer>> offerSlots(
+       @Override
+       public CompletableFuture<Collection<SlotOffer>> offerSlots(
                        final ResourceID taskManagerId,
                        final Iterable<SlotOffer> slots,
-                       final UUID leaderId) throws Exception {
+                       final UUID leaderId,
+                       final Time timeout) {
 
-               validateLeaderSessionId(leaderId);
+               try {
+                       validateLeaderSessionId(leaderId);
+               } catch (LeaderIdMismatchException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
 
                Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = 
registeredTaskManagers.get(taskManagerId);
 
                if (taskManager == null) {
-                       throw new Exception("Unknown TaskManager " + 
taskManagerId);
+                       return FutureUtils.completedExceptionally(new 
Exception("Unknown TaskManager " + taskManagerId));
                }
 
                final JobID jid = jobGraph.getJobID();
@@ -716,34 +744,40 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                return slotPoolGateway.offerSlots(slotsAndOffers);
        }
 
-       @RpcMethod
-       public void failSlot(final ResourceID taskManagerId,
+       @Override
+       public void failSlot(
+                       final ResourceID taskManagerId,
                        final AllocationID allocationId,
                        final UUID leaderId,
-                       final Exception cause) throws Exception
-       {
-               validateLeaderSessionId(leaderSessionID);
+                       final Exception cause) {
 
-               if (!registeredTaskManagers.containsKey(taskManagerId)) {
-                       throw new Exception("Unknown TaskManager " + 
taskManagerId);
+               try {
+                       validateLeaderSessionId(leaderSessionID);
+               } catch (LeaderIdMismatchException e) {
+                       log.warn("Cannot fail slot " + allocationId + '.', e);
                }
 
-               slotPoolGateway.failAllocation(allocationId, cause);
+               if (registeredTaskManagers.containsKey(taskManagerId)) {
+                       slotPoolGateway.failAllocation(allocationId, cause);
+               } else {
+                       log.warn("Cannot fail slot " + allocationId + " because 
the TaskManager " +
+                       taskManagerId + " is unknown.");
+               }
        }
 
-       @RpcMethod
+       @Override
        public CompletableFuture<RegistrationResponse> registerTaskManager(
                        final String taskManagerRpcAddress,
                        final TaskManagerLocation taskManagerLocation,
-                       final UUID leaderId) throws Exception
-       {
-               if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+                       final UUID leaderId,
+                       final Time timeout) {
+               if (!Objects.equals(leaderSessionID, leaderId)) {
                        log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected " +
                                                        "leader session ID {} 
did not equal the received leader session ID {}.",
-                                       taskManagerLocation.getResourceID(), 
taskManagerRpcAddress,
-                                       JobMaster.this.leaderSessionID, 
leaderId);
-                       throw new Exception("Leader id not match, expected: " + 
JobMaster.this.leaderSessionID
-                                       + ", actual: " + leaderId);
+                                       taskManagerLocation.getResourceID(), 
taskManagerRpcAddress, leaderSessionID, leaderId);
+                       return FutureUtils.completedExceptionally(
+                               new Exception("Leader id not match, expected: " 
+
+                                       leaderSessionID + ", actual: " + 
leaderId));
                }
 
                final ResourceID taskManagerId = 
taskManagerLocation.getResourceID();
@@ -790,13 +824,17 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
-       @RpcMethod
+       @Override
        public void disconnectResourceManager(
                        final UUID jobManagerLeaderId,
                        final UUID resourceManagerLeaderId,
-                       final Exception cause) throws Exception {
+                       final Exception cause) {
 
-               validateLeaderSessionId(jobManagerLeaderId);
+               try {
+                       validateLeaderSessionId(jobManagerLeaderId);
+               } catch (LeaderIdMismatchException e) {
+                       log.warn("Cannot disconnect resource manager " + 
resourceManagerLeaderId + '.', e);
+               }
 
                if (resourceManagerConnection != null
                                && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
@@ -804,12 +842,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
-       @RpcMethod
+       @Override
        public void heartbeatFromTaskManager(final ResourceID resourceID) {
                taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
        }
 
-       @RpcMethod
+       @Override
        public void heartbeatFromResourceManager(final ResourceID resourceID) {
                resourceManagerHeartbeatManager.requestHeartbeat(resourceID, 
null);
        }
@@ -1100,11 +1138,17 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        private class TaskManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
 
+               private final JobMasterGateway jobMasterGateway;
+
+               private TaskManagerHeartbeatListener(JobMasterGateway 
jobMasterGateway) {
+                       this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
+               }
+
                @Override
                public void notifyHeartbeatTimeout(ResourceID resourceID) {
                        log.info("Heartbeat of TaskManager with id {} timed 
out.", resourceID);
 
-                       getSelf().disconnectTaskManager(
+                       jobMasterGateway.disconnectTaskManager(
                                resourceID,
                                new TimeoutException("Heartbeat of TaskManager 
with id " + resourceID + " timed out."));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e3611a3..b396cd6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
@@ -186,7 +187,7 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param timeout       for the rpc call
         * @return Future set of accepted slots.
         */
-       CompletableFuture<Iterable<SlotOffer>> offerSlots(
+       CompletableFuture<Collection<SlotOffer>> offerSlots(
                        final ResourceID taskManagerId,
                        final Iterable<SlotOffer> slots,
                        final UUID leaderId,

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8318c09..c2b0590 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -44,9 +45,9 @@ import 
org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistrat
 import 
org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -77,13 +78,13 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact 
with him remotely:
  * <ul>
- *     <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID)} 
registers a {@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from 
the resource manager</li>
+ *     <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID, 
Time)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #requestSlot(UUID, UUID, SlotRequest, Time)} requests a slot 
from the resource manager</li>
  * </ul>
  */
 public abstract class ResourceManager<WorkerType extends Serializable>
-               extends RpcEndpoint<ResourceManagerGateway>
-               implements LeaderContender {
+               extends RpcEndpoint
+               implements ResourceManagerGateway, LeaderContender {
 
        public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
 
@@ -244,13 +245,14 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        //  RPC methods
        // 
------------------------------------------------------------------------
 
-       @RpcMethod
+       @Override
        public CompletableFuture<RegistrationResponse> registerJobManager(
                        final UUID resourceManagerLeaderId,
                        final UUID jobManagerLeaderId,
                        final ResourceID jobManagerResourceId,
                        final String jobManagerAddress,
-                       final JobID jobId) {
+                       final JobID jobId,
+                       final Time timeout) {
 
                checkNotNull(resourceManagerLeaderId);
                checkNotNull(jobManagerLeaderId);
@@ -352,12 +354,13 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         *
         * @return The response by the ResourceManager.
         */
-       @RpcMethod
+       @Override
        public CompletableFuture<RegistrationResponse> registerTaskExecutor(
                        final UUID resourceManagerLeaderId,
                        final String taskExecutorAddress,
                        final ResourceID taskExecutorResourceId,
-                       final SlotReport slotReport) {
+                       final SlotReport slotReport,
+                       final Time timeout) {
 
                if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
                        CompletableFuture<TaskExecutorGateway> 
taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, 
TaskExecutorGateway.class);
@@ -387,22 +390,22 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
        }
 
-       @RpcMethod
+       @Override
        public void heartbeatFromTaskManager(final ResourceID resourceID, final 
SlotReport slotReport) {
                taskManagerHeartbeatManager.receiveHeartbeat(resourceID, 
slotReport);
        }
 
-       @RpcMethod
+       @Override
        public void heartbeatFromJobManager(final ResourceID resourceID) {
                jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
        }
 
-       @RpcMethod
+       @Override
        public void disconnectTaskManager(final ResourceID resourceId, final 
Exception cause) {
                closeTaskManagerConnection(resourceId, cause);
        }
 
-       @RpcMethod
+       @Override
        public void disconnectJobManager(final JobID jobId, final Exception 
cause) {
                closeJobManagerConnection(jobId, cause);
        }
@@ -413,14 +416,15 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         * @param slotRequest Slot request
         * @return Slot assignment
         */
-       @RpcMethod
-       public Acknowledge requestSlot(
+       @Override
+       public CompletableFuture<Acknowledge> requestSlot(
                        UUID jobMasterLeaderID,
                        UUID resourceManagerLeaderID,
-                       SlotRequest slotRequest) throws 
ResourceManagerException, LeaderSessionIDException {
+                       SlotRequest slotRequest,
+                       final Time timeout) {
 
                if (!Objects.equals(resourceManagerLeaderID, leaderSessionId)) {
-                       throw new 
LeaderSessionIDException(resourceManagerLeaderID, leaderSessionId);
+                       return FutureUtils.completedExceptionally(new 
LeaderSessionIDException(resourceManagerLeaderID, leaderSessionId));
                }
 
                JobID jobId = slotRequest.getJobId();
@@ -433,15 +437,19 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                        slotRequest.getJobId(),
                                        slotRequest.getAllocationId());
 
-                               slotManager.registerSlotRequest(slotRequest);
+                               try {
+                                       
slotManager.registerSlotRequest(slotRequest);
+                               } catch (SlotManagerException e) {
+                                       return 
FutureUtils.completedExceptionally(e);
+                               }
 
-                               return Acknowledge.get();
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
                        } else {
-                               throw new 
LeaderSessionIDException(jobMasterLeaderID, 
jobManagerRegistration.getLeaderID());
+                               return FutureUtils.completedExceptionally(new 
LeaderSessionIDException(jobMasterLeaderID, 
jobManagerRegistration.getLeaderID()));
                        }
 
                } else {
-                       throw new ResourceManagerException("Could not find 
registered job manager for job " + jobId + '.');
+                       return FutureUtils.completedExceptionally(new 
ResourceManagerException("Could not find registered job manager for job " + 
jobId + '.'));
                }
        }
 
@@ -451,7 +459,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         * @param instanceID TaskExecutor's instance id
         * @param slotId The slot id of the available slot
         */
-       @RpcMethod
+       @Override
        public void notifySlotAvailable(
                        final UUID resourceManagerLeaderId,
                        final InstanceID instanceID,
@@ -487,7 +495,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         *
         * @param address address of infoMessage listener to register to this 
resource manager
         */
-       @RpcMethod
+       @Override
        public void registerInfoMessageListener(final String address) {
                if(infoMessageListeners.containsKey(address)) {
                        log.warn("Receive a duplicate registration from info 
message listener on ({})", address);
@@ -514,7 +522,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         * @param address of the  info message listener to unregister from this 
resource manager
         *
         */
-       @RpcMethod
+       @Override
        public void unRegisterInfoMessageListener(final String address) {
                infoMessageListeners.remove(address);
        }
@@ -525,7 +533,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         * @param finalStatus of the Flink application
         * @param optionalDiagnostics for the Flink application
         */
-       @RpcMethod
+       @Override
        public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
                log.info("Shut down cluster because application is in {}, 
diagnostics {}.", finalStatus, optionalDiagnostics);
 
@@ -536,13 +544,13 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
        }
 
-       @RpcMethod
-       public Integer getNumberOfRegisteredTaskManagers(UUID 
requestLeaderSessionId) throws LeaderIdMismatchException {
+       @Override
+       public CompletableFuture<Integer> 
getNumberOfRegisteredTaskManagers(UUID requestLeaderSessionId) {
                if (Objects.equals(leaderSessionId, requestLeaderSessionId)) {
-                       return taskExecutors.size();
+                       return 
CompletableFuture.completedFuture(taskExecutors.size());
                }
                else {
-                       throw new LeaderIdMismatchException(leaderSessionId, 
requestLeaderSessionId);
+                       return FutureUtils.completedExceptionally(new 
LeaderIdMismatchException(leaderSessionId, requestLeaderSessionId));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
index b3fea77..8a0e5f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
@@ -29,9 +29,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public final class MainThreadValidatorUtil {
 
-       private final RpcEndpoint<?> endpoint;
+       private final RpcEndpoint endpoint;
 
-       public MainThreadValidatorUtil(RpcEndpoint<?> endpoint) {
+       public MainThreadValidatorUtil(RpcEndpoint endpoint) {
                this.endpoint = checkNotNull(endpoint);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index b5bbc2b..980ae48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,11 +51,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * of Erlang or Akka.
  *
  * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link 
#callAsync(Callable, Time)}
-  * and the {@link #getMainThreadExecutor()} to execute code in the RPC 
endpoint's main thread.
- *
- * @param <C> The RPC gateway counterpart for the implementing RPC endpoint
+ * and the {@link #getMainThreadExecutor()} to execute code in the RPC 
endpoint's main thread.
  */
-public abstract class RpcEndpoint<C extends RpcGateway> {
+public abstract class RpcEndpoint implements RpcGateway {
 
        protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -67,11 +65,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        /** Unique identifier for this rpc endpoint */
        private final String endpointId;
 
-       /** Class of the self gateway */
-       private final Class<C> selfGatewayType;
-
-       /** Self gateway which can be used to schedule asynchronous calls on 
yourself */
-       private final C self;
+       /** Interface to access the underlying rpc server */
+       private final RpcServer rpcServer;
 
        /** The main thread executor to be used to execute future callbacks in 
the main thread
         * of the executing rpc server. */
@@ -90,12 +85,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                this.rpcService = checkNotNull(rpcService, "rpcService");
                this.endpointId = checkNotNull(endpointId, "endpointId");
 
-               // IMPORTANT: Don't change order of selfGatewayType and self 
because rpcService.startServer
-               // requires that selfGatewayType has been initialized
-               this.selfGatewayType = determineSelfGatewayType();
-               this.self = rpcService.startServer(this);
+               this.rpcServer = rpcService.startServer(this);
 
-               this.mainThreadExecutor = new 
MainThreadExecutor((MainThreadExecutable) self);
+               this.mainThreadExecutor = new MainThreadExecutor(rpcServer);
        }
 
        /**
@@ -108,15 +100,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        }
 
        /**
-        * Returns the class of the self gateway type.
-        *
-        * @return Class of the self gateway type
-        */
-       public final Class<C> getSelfGatewayType() {
-               return selfGatewayType;
-       }
-
-       /**
         * Returns the rpc endpoint's identifier.
         *
         * @return Rpc endpoint's identifier.
@@ -139,7 +122,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @throws Exception indicating that something went wrong while 
starting the RPC endpoint
         */
        public void start() throws Exception {
-               ((StartStoppable) self).start();
+               rpcServer.start();
+       }
+
+       /**
+        * Stops the rpc endpoint. This tells the underlying rpc server that 
the rpc endpoint is
+        * no longer ready to process remote procedure calls.
+        */
+       protected final void stop() {
+               rpcServer.stop();
        }
 
        /**
@@ -161,7 +152,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * via {@link #getTerminationFuture()}} and wait on its completion.
         */
        public final void shutDown() {
-               rpcService.stopServer(self);
+               rpcService.stopServer(rpcServer);
        }
 
        // 
------------------------------------------------------------------------
@@ -169,15 +160,25 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        // 
------------------------------------------------------------------------
 
        /**
-        * Get self-gateway which should be used to run asynchronous RPC calls 
on this endpoint.
+        * Returns a self gateway of the specified type which can be used to 
issue asynchronous
+        * calls against the RpcEndpoint.
         *
-        * <p><b>IMPORTANT</b>: Always issue local method calls via the 
self-gateway if the current thread
-        * is not the main thread of the underlying rpc server, e.g. from 
within a future callback.
+        * <p>IMPORTANT: The self gateway type must be implemented by the 
RpcEndpoint. Otherwise
+        * the method will fail.
         *
-        * @return The self gateway
+        * @param selfGatewayType class of the self gateway type
+        * @param <C> type of the self gateway to create
+        * @return Self gateway of the specified type which can be used to 
issue asynchronous rpcs
         */
-       public C getSelf() {
-               return self;
+       public <C extends RpcGateway> C getSelfGateway(Class<C> 
selfGatewayType) {
+               if (selfGatewayType.isInstance(rpcServer)) {
+                       @SuppressWarnings("unchecked")
+                       C selfGateway = ((C) rpcServer);
+
+                       return selfGateway;
+               } else {
+                       throw new RuntimeException("RpcEndpoint does not 
implement the RpcGateway interface of type " + selfGatewayType + '.');
+               }
        }
 
        /**
@@ -186,8 +187,19 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         *
         * @return Fully qualified address of the underlying RPC endpoint
         */
+       @Override
        public String getAddress() {
-               return self.getAddress();
+               return rpcServer.getAddress();
+       }
+
+       /**
+        * Gets the hostname of the underlying RPC endpoint.
+        *
+        * @return Hostname on which the RPC endpoint is running
+        */
+       @Override
+       public String getHostname() {
+               return rpcServer.getHostname();
        }
 
        /**
@@ -215,7 +227,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @return Future which is completed when the rpc endpoint has been 
terminated.
         */
        public CompletableFuture<Void> getTerminationFuture() {
-               return ((SelfGateway)self).getTerminationFuture();
+               return rpcServer.getTerminationFuture();
        }
 
        // 
------------------------------------------------------------------------
@@ -229,7 +241,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param runnable Runnable to be executed in the main thread of the 
underlying RPC endpoint
         */
        protected void runAsync(Runnable runnable) {
-               ((MainThreadExecutable) self).runAsync(runnable);
+               rpcServer.runAsync(runnable);
        }
 
        /**
@@ -251,13 +263,13 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param delay    The delay after which the runnable will be executed
         */
        protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit 
unit) {
-               ((MainThreadExecutable) self).scheduleRunAsync(runnable, 
unit.toMillis(delay));
+               rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
        }
 
        /**
         * Execute the callable in the main thread of the underlying RPC 
service, returning a future for
         * the result of the callable. If the callable is not completed within 
the given timeout, then
-        * the future will be failed with a {@link 
java.util.concurrent.TimeoutException}.
+        * the future will be failed with a {@link TimeoutException}.
         *
         * @param callable Callable to be executed in the main thread of the 
underlying rpc server
         * @param timeout Timeout for the callable to be completed
@@ -265,7 +277,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @return Future for the result of the callable.
         */
        protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Time 
timeout) {
-               return ((MainThreadExecutable) self).callAsync(callable, 
timeout);
+               return rpcServer.callAsync(callable, timeout);
        }
 
        // 
------------------------------------------------------------------------
@@ -311,23 +323,4 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                        gateway.runAsync(runnable);
                }
        }
-
-       /**
-        * Determines the self gateway type specified in one of the subclasses 
which extend this class.
-        * May traverse multiple class hierarchies until a Gateway type is 
found as a first type argument.
-        * @return Class<C> The determined self gateway type
-        */
-       private Class<C> determineSelfGatewayType() {
-
-               // determine self gateway type
-               Class<?> c = getClass();
-               Class<C> determinedSelfGatewayType;
-               do {
-                       determinedSelfGatewayType = 
ReflectionUtil.getTemplateType1(c);
-                       // check if super class contains self gateway type in 
next loop
-                       c = c.getSuperclass();
-               } while 
(!RpcGateway.class.isAssignableFrom(determinedSelfGatewayType));
-
-               return determinedSelfGatewayType;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
deleted file mode 100644
index e4b0e94..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Inherited;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation for rpc method in a {@link RpcEndpoint} implementation. Every 
rpc method must have a
- * respective counterpart in the {@link RpcGateway} implementation for this 
rpc server. The
- * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server 
and the set of
- * gateway methods in the corresponding gateway implementation are identical.
- */
-@Inherited
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface RpcMethod {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
new file mode 100644
index 0000000..9c645cc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for self gateways
+ */
+public interface RpcServer extends StartStoppable, MainThreadExecutable, 
RpcGateway {
+
+       /**
+        * Return a future which is completed when the rpc endpoint has been 
terminated.
+        *
+        * @return Future indicating when the rpc endpoint has been terminated
+        */
+       CompletableFuture<Void> getTerminationFuture();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a92f3e2..3b5a5e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -66,20 +66,18 @@ public interface RpcService {
        /**
         * Start a rpc server which forwards the remote procedure calls to the 
provided rpc endpoint.
         *
-        * @param rpcEndpoint Rpc protocl to dispath the rpcs to
-        * @param <S> Type of the rpc endpoint
-        * @param <C> Type of the self rpc gateway associated with the rpc 
server
+        * @param rpcEndpoint Rpc protocol to dispatch the rpcs to
+        * @param <C> Type of the rpc endpoint
         * @return Self gateway to dispatch remote procedure calls to oneself
         */
-       <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint);
+       <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C 
rpcEndpoint);
 
        /**
         * Stop the underlying rpc server of the provided self gateway.
         *
         * @param selfGateway Self gateway describing the underlying rpc server
-        * @param <C> Type of the rpc gateway
         */
-       <C extends RpcGateway> void stopServer(C selfGateway);
+       void stopServer(RpcServer selfGateway);
 
        /**
         * Stop the rpc service shutting down all started rpc servers.

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
new file mode 100644
index 0000000..9738970
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Utility functions for Flink's RPC implementation
+ */
+public class RpcUtils {
+       /**
+        * Extracts all {@link RpcGateway} interfaces implemented by the given 
clazz.
+        *
+        * @param clazz from which to extract the implemented RpcGateway 
interfaces
+        * @return A set of all implemented RpcGateway interfaces
+        */
+       public static Set<Class<? extends RpcGateway>> 
extractImplementedRpcGateways(Class<?> clazz) {
+               HashSet<Class<? extends RpcGateway>> interfaces = new 
HashSet<>();
+
+               while (clazz != null) {
+                       for (Class<?> interfaze : clazz.getInterfaces()) {
+                               if 
(RpcGateway.class.isAssignableFrom(interfaze)) {
+                                       interfaces.add((Class<? extends 
RpcGateway>)interfaze);
+                               }
+                       }
+
+                       clazz = clazz.getSuperclass();
+               }
+
+               return interfaces;
+       }
+
+       // We don't want this class to be instantiable
+       private RpcUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
deleted file mode 100644
index d39b1ef..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Interface for self gateways
- */
-public interface SelfGateway {
-
-       /**
-        * Return a future which is completed when the rpc endpoint has been 
terminated.
-        *
-        * @return Future indicating when the rpc endpoint has been terminated
-        */
-       CompletableFuture<Void> getTerminationFuture();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index ae6b832..0521f2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
-import org.apache.flink.runtime.rpc.SelfGateway;
+import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
@@ -38,11 +37,12 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
-import java.util.BitSet;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -55,7 +55,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
  * executed.
  */
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutable, StartStoppable, SelfGateway {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
RpcServer {
        private static final Logger LOG = 
LoggerFactory.getLogger(AkkaInvocationHandler.class);
 
        /**
@@ -88,7 +88,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
                        ActorRef rpcEndpoint,
                        Time timeout,
                        long maximumFramesize,
-                       CompletableFuture<Void> terminationFuture) {
+                       @Nullable CompletableFuture<Void> terminationFuture) {
 
                this.address = Preconditions.checkNotNull(address);
                this.hostname = Preconditions.checkNotNull(hostname);
@@ -105,9 +105,12 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
                Object result;
 
-               if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutable.class) ||
-                       declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
-                       declaringClass.equals(RpcGateway.class) || 
declaringClass.equals(SelfGateway.class)) {
+               if (declaringClass.equals(AkkaGateway.class) ||
+                       declaringClass.equals(Object.class) ||
+                       declaringClass.equals(RpcGateway.class) ||
+                       declaringClass.equals(StartStoppable.class) ||
+                       declaringClass.equals(MainThreadExecutable.class) ||
+                       declaringClass.equals(RpcServer.class)) {
                        result = method.invoke(this, args);
                } else {
                        String methodName = method.getName();
@@ -115,24 +118,19 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
                        Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
                        Time futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
 
-                       Tuple2<Class<?>[], Object[]> filteredArguments = 
filterArguments(
-                               parameterTypes,
-                               parameterAnnotations,
-                               args);
-
                        RpcInvocation rpcInvocation;
 
                        if (isLocal) {
                                rpcInvocation = new LocalRpcInvocation(
                                        methodName,
-                                       filteredArguments.f0,
-                                       filteredArguments.f1);
+                                       parameterTypes,
+                                       args);
                        } else {
                                try {
                                        RemoteRpcInvocation remoteRpcInvocation 
= new RemoteRpcInvocation(
                                                methodName,
-                                               filteredArguments.f0,
-                                               filteredArguments.f1);
+                                               parameterTypes,
+                                               args);
 
                                        if (remoteRpcInvocation.getSize() > 
maximumFramesize) {
                                                throw new IOException("The rpc 
invocation size exceeds the maximum akka framesize.");
@@ -249,62 +247,6 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
        }
 
        /**
-        * Removes all {@link RpcTimeout} annotated parameters from the 
parameter type and argument
-        * list.
-        *
-        * @param parameterTypes Array of parameter types
-        * @param parameterAnnotations Array of parameter annotations
-        * @param args Arary of arguments
-        * @return Tuple of filtered parameter types and arguments which no 
longer contain the
-        * {@link RpcTimeout} annotated parameter types and arguments
-        */
-       private static Tuple2<Class<?>[], Object[]> filterArguments(
-               Class<?>[] parameterTypes,
-               Annotation[][] parameterAnnotations,
-               Object[] args) {
-
-               Class<?>[] filteredParameterTypes;
-               Object[] filteredArgs;
-
-               if (args == null) {
-                       filteredParameterTypes = parameterTypes;
-                       filteredArgs = null;
-               } else {
-                       Preconditions.checkArgument(parameterTypes.length == 
parameterAnnotations.length);
-                       Preconditions.checkArgument(parameterAnnotations.length 
== args.length);
-
-                       BitSet isRpcTimeoutParameter = new 
BitSet(parameterTypes.length);
-                       int numberRpcParameters = parameterTypes.length;
-
-                       for (int i = 0; i < parameterTypes.length; i++) {
-                               if (isRpcTimeout(parameterAnnotations[i])) {
-                                       isRpcTimeoutParameter.set(i);
-                                       numberRpcParameters--;
-                               }
-                       }
-
-                       if (numberRpcParameters == parameterTypes.length) {
-                               filteredParameterTypes = parameterTypes;
-                               filteredArgs = args;
-                       } else {
-                               filteredParameterTypes = new 
Class<?>[numberRpcParameters];
-                               filteredArgs = new Object[numberRpcParameters];
-                               int counter = 0;
-
-                               for (int i = 0; i < parameterTypes.length; i++) 
{
-                                       if (!isRpcTimeoutParameter.get(i)) {
-                                               filteredParameterTypes[counter] 
= parameterTypes[i];
-                                               filteredArgs[counter] = args[i];
-                                               counter++;
-                                       }
-                               }
-                       }
-               }
-
-               return Tuple2.of(filteredParameterTypes, filteredArgs);
-       }
-
-       /**
         * Checks whether any of the annotations is of type {@link RpcTimeout}
         *
         * @param annotations Array of annotations

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 5845473..f557447 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -66,10 +66,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * stops processing messages. All messages which arrive when the processing is 
stopped, will be
  * discarded.
  *
- * @param <C> Type of the {@link RpcGateway} associated with the {@link 
RpcEndpoint}
  * @param <T> Type of the {@link RpcEndpoint}
  */
-class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends 
UntypedActor {
+class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcActor.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 80267f9..ab851f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -33,18 +33,18 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.SelfGateway;
-import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.RpcServer;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.Nonnull;
@@ -188,7 +188,7 @@ public class AkkaRpcService implements RpcService {
        }
 
        @Override
-       public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint) {
+       public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C 
rpcEndpoint) {
                checkNotNull(rpcEndpoint, "rpc endpoint");
 
                CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
@@ -225,22 +225,22 @@ public class AkkaRpcService implements RpcService {
                // code is loaded dynamically (for example from an OSGI bundle) 
through a custom ClassLoader
                ClassLoader classLoader = getClass().getClassLoader();
 
+               Set<Class<? extends RpcGateway>> implementedRpcGateways = 
RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass());
+
+               implementedRpcGateways.add(RpcServer.class);
+               implementedRpcGateways.add(AkkaGateway.class);
+
                @SuppressWarnings("unchecked")
-               C self = (C) Proxy.newProxyInstance(
+               RpcServer server = (RpcServer) Proxy.newProxyInstance(
                        classLoader,
-                       new Class<?>[]{
-                               rpcEndpoint.getSelfGatewayType(),
-                               SelfGateway.class,
-                               MainThreadExecutable.class,
-                               StartStoppable.class,
-                               AkkaGateway.class},
+                       implementedRpcGateways.toArray(new 
Class<?>[implementedRpcGateways.size()]),
                        akkaInvocationHandler);
 
-               return self;
+               return server;
        }
 
        @Override
-       public void stopServer(RpcGateway selfGateway) {
+       public void stopServer(RpcServer selfGateway) {
                if (selfGateway instanceof AkkaGateway) {
                        AkkaGateway akkaClient = (AkkaGateway) selfGateway;
 
@@ -312,7 +312,7 @@ public class AkkaRpcService implements RpcService {
 
        @Override
        public <T> CompletableFuture<T> execute(Callable<T> callable) {
-               scala.concurrent.Future<T> scalaFuture = 
Futures.future(callable, actorSystem.dispatcher());
+               Future<T> scalaFuture = Futures.future(callable, 
actorSystem.dispatcher());
 
                return FutureUtils.toJava(scalaFuture);
        }

Reply via email to